1689aea55527f7c905765de74a4dbb79413b4bbf
[linux-2.6-microblaze.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73
74 #define CREATE_TRACE_POINTS
75 #include <trace/events/io_uring.h>
76
77 #include <uapi/linux/io_uring.h>
78
79 #include "internal.h"
80 #include "io-wq.h"
81
82 #define IORING_MAX_ENTRIES      32768
83 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
84
85 /*
86  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
87  */
88 #define IORING_FILE_TABLE_SHIFT 9
89 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
90 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
91 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
92
93 struct io_uring {
94         u32 head ____cacheline_aligned_in_smp;
95         u32 tail ____cacheline_aligned_in_smp;
96 };
97
98 /*
99  * This data is shared with the application through the mmap at offsets
100  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
101  *
102  * The offsets to the member fields are published through struct
103  * io_sqring_offsets when calling io_uring_setup.
104  */
105 struct io_rings {
106         /*
107          * Head and tail offsets into the ring; the offsets need to be
108          * masked to get valid indices.
109          *
110          * The kernel controls head of the sq ring and the tail of the cq ring,
111          * and the application controls tail of the sq ring and the head of the
112          * cq ring.
113          */
114         struct io_uring         sq, cq;
115         /*
116          * Bitmasks to apply to head and tail offsets (constant, equals
117          * ring_entries - 1)
118          */
119         u32                     sq_ring_mask, cq_ring_mask;
120         /* Ring sizes (constant, power of 2) */
121         u32                     sq_ring_entries, cq_ring_entries;
122         /*
123          * Number of invalid entries dropped by the kernel due to
124          * invalid index stored in array
125          *
126          * Written by the kernel, shouldn't be modified by the
127          * application (i.e. get number of "new events" by comparing to
128          * cached value).
129          *
130          * After a new SQ head value was read by the application this
131          * counter includes all submissions that were dropped reaching
132          * the new SQ head (and possibly more).
133          */
134         u32                     sq_dropped;
135         /*
136          * Runtime flags
137          *
138          * Written by the kernel, shouldn't be modified by the
139          * application.
140          *
141          * The application needs a full memory barrier before checking
142          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
143          */
144         u32                     sq_flags;
145         /*
146          * Number of completion events lost because the queue was full;
147          * this should be avoided by the application by making sure
148          * there are not more requests pending thatn there is space in
149          * the completion queue.
150          *
151          * Written by the kernel, shouldn't be modified by the
152          * application (i.e. get number of "new events" by comparing to
153          * cached value).
154          *
155          * As completion events come in out of order this counter is not
156          * ordered with any other data.
157          */
158         u32                     cq_overflow;
159         /*
160          * Ring buffer of completion events.
161          *
162          * The kernel writes completion events fresh every time they are
163          * produced, so the application is allowed to modify pending
164          * entries.
165          */
166         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
167 };
168
169 struct io_mapped_ubuf {
170         u64             ubuf;
171         size_t          len;
172         struct          bio_vec *bvec;
173         unsigned int    nr_bvecs;
174 };
175
176 struct fixed_file_table {
177         struct file             **files;
178 };
179
180 struct io_ring_ctx {
181         struct {
182                 struct percpu_ref       refs;
183         } ____cacheline_aligned_in_smp;
184
185         struct {
186                 unsigned int            flags;
187                 bool                    compat;
188                 bool                    account_mem;
189                 bool                    cq_overflow_flushed;
190                 bool                    drain_next;
191
192                 /*
193                  * Ring buffer of indices into array of io_uring_sqe, which is
194                  * mmapped by the application using the IORING_OFF_SQES offset.
195                  *
196                  * This indirection could e.g. be used to assign fixed
197                  * io_uring_sqe entries to operations and only submit them to
198                  * the queue when needed.
199                  *
200                  * The kernel modifies neither the indices array nor the entries
201                  * array.
202                  */
203                 u32                     *sq_array;
204                 unsigned                cached_sq_head;
205                 unsigned                sq_entries;
206                 unsigned                sq_mask;
207                 unsigned                sq_thread_idle;
208                 unsigned                cached_sq_dropped;
209                 atomic_t                cached_cq_overflow;
210                 struct io_uring_sqe     *sq_sqes;
211
212                 struct list_head        defer_list;
213                 struct list_head        timeout_list;
214                 struct list_head        cq_overflow_list;
215
216                 wait_queue_head_t       inflight_wait;
217         } ____cacheline_aligned_in_smp;
218
219         struct io_rings *rings;
220
221         /* IO offload */
222         struct io_wq            *io_wq;
223         struct task_struct      *sqo_thread;    /* if using sq thread polling */
224         struct mm_struct        *sqo_mm;
225         wait_queue_head_t       sqo_wait;
226
227         /*
228          * If used, fixed file set. Writers must ensure that ->refs is dead,
229          * readers must ensure that ->refs is alive as long as the file* is
230          * used. Only updated through io_uring_register(2).
231          */
232         struct fixed_file_table *file_table;
233         unsigned                nr_user_files;
234
235         /* if used, fixed mapped user buffers */
236         unsigned                nr_user_bufs;
237         struct io_mapped_ubuf   *user_bufs;
238
239         struct user_struct      *user;
240
241         const struct cred       *creds;
242
243         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
244         struct completion       *completions;
245
246         /* if all else fails... */
247         struct io_kiocb         *fallback_req;
248
249 #if defined(CONFIG_UNIX)
250         struct socket           *ring_sock;
251 #endif
252
253         struct {
254                 unsigned                cached_cq_tail;
255                 unsigned                cq_entries;
256                 unsigned                cq_mask;
257                 atomic_t                cq_timeouts;
258                 struct wait_queue_head  cq_wait;
259                 struct fasync_struct    *cq_fasync;
260                 struct eventfd_ctx      *cq_ev_fd;
261         } ____cacheline_aligned_in_smp;
262
263         struct {
264                 struct mutex            uring_lock;
265                 wait_queue_head_t       wait;
266         } ____cacheline_aligned_in_smp;
267
268         struct {
269                 spinlock_t              completion_lock;
270                 bool                    poll_multi_file;
271                 /*
272                  * ->poll_list is protected by the ctx->uring_lock for
273                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
274                  * For SQPOLL, only the single threaded io_sq_thread() will
275                  * manipulate the list, hence no extra locking is needed there.
276                  */
277                 struct list_head        poll_list;
278                 struct rb_root          cancel_tree;
279
280                 spinlock_t              inflight_lock;
281                 struct list_head        inflight_list;
282         } ____cacheline_aligned_in_smp;
283 };
284
285 /*
286  * First field must be the file pointer in all the
287  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
288  */
289 struct io_poll_iocb {
290         struct file                     *file;
291         struct wait_queue_head          *head;
292         __poll_t                        events;
293         bool                            done;
294         bool                            canceled;
295         struct wait_queue_entry         *wait;
296 };
297
298 struct io_timeout_data {
299         struct io_kiocb                 *req;
300         struct hrtimer                  timer;
301         struct timespec64               ts;
302         enum hrtimer_mode               mode;
303         u32                             seq_offset;
304 };
305
306 struct io_timeout {
307         struct file                     *file;
308         struct io_timeout_data          *data;
309 };
310
311 struct io_async_rw {
312         struct iovec                    fast_iov[UIO_FASTIOV];
313         struct iovec                    *iov;
314         ssize_t                         nr_segs;
315         ssize_t                         size;
316 };
317
318 struct io_async_ctx {
319         struct io_uring_sqe             sqe;
320         union {
321                 struct io_async_rw      rw;
322         };
323 };
324
325 /*
326  * NOTE! Each of the iocb union members has the file pointer
327  * as the first entry in their struct definition. So you can
328  * access the file pointer through any of the sub-structs,
329  * or directly as just 'ki_filp' in this struct.
330  */
331 struct io_kiocb {
332         union {
333                 struct file             *file;
334                 struct kiocb            rw;
335                 struct io_poll_iocb     poll;
336                 struct io_timeout       timeout;
337         };
338
339         const struct io_uring_sqe       *sqe;
340         struct io_async_ctx             *io;
341         struct file                     *ring_file;
342         int                             ring_fd;
343         bool                            has_user;
344         bool                            in_async;
345         bool                            needs_fixed_file;
346
347         struct io_ring_ctx      *ctx;
348         union {
349                 struct list_head        list;
350                 struct rb_node          rb_node;
351         };
352         struct list_head        link_list;
353         unsigned int            flags;
354         refcount_t              refs;
355 #define REQ_F_NOWAIT            1       /* must not punt to workers */
356 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
357 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
358 #define REQ_F_LINK_NEXT         8       /* already grabbed next link */
359 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
360 #define REQ_F_IO_DRAINED        32      /* drain done */
361 #define REQ_F_LINK              64      /* linked sqes */
362 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
363 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
364 #define REQ_F_DRAIN_LINK        512     /* link should be fully drained */
365 #define REQ_F_TIMEOUT           1024    /* timeout request */
366 #define REQ_F_ISREG             2048    /* regular file */
367 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
368 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
369 #define REQ_F_INFLIGHT          16384   /* on inflight list */
370 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
371         u64                     user_data;
372         u32                     result;
373         u32                     sequence;
374
375         struct list_head        inflight_entry;
376
377         struct io_wq_work       work;
378 };
379
380 #define IO_PLUG_THRESHOLD               2
381 #define IO_IOPOLL_BATCH                 8
382
383 struct io_submit_state {
384         struct blk_plug         plug;
385
386         /*
387          * io_kiocb alloc cache
388          */
389         void                    *reqs[IO_IOPOLL_BATCH];
390         unsigned                int free_reqs;
391         unsigned                int cur_req;
392
393         /*
394          * File reference cache
395          */
396         struct file             *file;
397         unsigned int            fd;
398         unsigned int            has_refs;
399         unsigned int            used_refs;
400         unsigned int            ios_left;
401 };
402
403 static void io_wq_submit_work(struct io_wq_work **workptr);
404 static void io_cqring_fill_event(struct io_kiocb *req, long res);
405 static void __io_free_req(struct io_kiocb *req);
406 static void io_put_req(struct io_kiocb *req);
407 static void io_double_put_req(struct io_kiocb *req);
408 static void __io_double_put_req(struct io_kiocb *req);
409 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
410 static void io_queue_linked_timeout(struct io_kiocb *req);
411
412 static struct kmem_cache *req_cachep;
413
414 static const struct file_operations io_uring_fops;
415
416 struct sock *io_uring_get_socket(struct file *file)
417 {
418 #if defined(CONFIG_UNIX)
419         if (file->f_op == &io_uring_fops) {
420                 struct io_ring_ctx *ctx = file->private_data;
421
422                 return ctx->ring_sock->sk;
423         }
424 #endif
425         return NULL;
426 }
427 EXPORT_SYMBOL(io_uring_get_socket);
428
429 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
430 {
431         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
432
433         complete(&ctx->completions[0]);
434 }
435
436 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
437 {
438         struct io_ring_ctx *ctx;
439
440         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
441         if (!ctx)
442                 return NULL;
443
444         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
445         if (!ctx->fallback_req)
446                 goto err;
447
448         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
449         if (!ctx->completions)
450                 goto err;
451
452         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
453                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
454                 goto err;
455
456         ctx->flags = p->flags;
457         init_waitqueue_head(&ctx->cq_wait);
458         INIT_LIST_HEAD(&ctx->cq_overflow_list);
459         init_completion(&ctx->completions[0]);
460         init_completion(&ctx->completions[1]);
461         mutex_init(&ctx->uring_lock);
462         init_waitqueue_head(&ctx->wait);
463         spin_lock_init(&ctx->completion_lock);
464         INIT_LIST_HEAD(&ctx->poll_list);
465         ctx->cancel_tree = RB_ROOT;
466         INIT_LIST_HEAD(&ctx->defer_list);
467         INIT_LIST_HEAD(&ctx->timeout_list);
468         init_waitqueue_head(&ctx->inflight_wait);
469         spin_lock_init(&ctx->inflight_lock);
470         INIT_LIST_HEAD(&ctx->inflight_list);
471         return ctx;
472 err:
473         if (ctx->fallback_req)
474                 kmem_cache_free(req_cachep, ctx->fallback_req);
475         kfree(ctx->completions);
476         kfree(ctx);
477         return NULL;
478 }
479
480 static inline bool __req_need_defer(struct io_kiocb *req)
481 {
482         struct io_ring_ctx *ctx = req->ctx;
483
484         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
485                                         + atomic_read(&ctx->cached_cq_overflow);
486 }
487
488 static inline bool req_need_defer(struct io_kiocb *req)
489 {
490         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
491                 return __req_need_defer(req);
492
493         return false;
494 }
495
496 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
497 {
498         struct io_kiocb *req;
499
500         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
501         if (req && !req_need_defer(req)) {
502                 list_del_init(&req->list);
503                 return req;
504         }
505
506         return NULL;
507 }
508
509 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
510 {
511         struct io_kiocb *req;
512
513         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
514         if (req) {
515                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
516                         return NULL;
517                 if (!__req_need_defer(req)) {
518                         list_del_init(&req->list);
519                         return req;
520                 }
521         }
522
523         return NULL;
524 }
525
526 static void __io_commit_cqring(struct io_ring_ctx *ctx)
527 {
528         struct io_rings *rings = ctx->rings;
529
530         if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
531                 /* order cqe stores with ring update */
532                 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
533
534                 if (wq_has_sleeper(&ctx->cq_wait)) {
535                         wake_up_interruptible(&ctx->cq_wait);
536                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
537                 }
538         }
539 }
540
541 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
542 {
543         u8 opcode = READ_ONCE(sqe->opcode);
544
545         return !(opcode == IORING_OP_READ_FIXED ||
546                  opcode == IORING_OP_WRITE_FIXED);
547 }
548
549 static inline bool io_prep_async_work(struct io_kiocb *req,
550                                       struct io_kiocb **link)
551 {
552         bool do_hashed = false;
553
554         if (req->sqe) {
555                 switch (req->sqe->opcode) {
556                 case IORING_OP_WRITEV:
557                 case IORING_OP_WRITE_FIXED:
558                         do_hashed = true;
559                         /* fall-through */
560                 case IORING_OP_READV:
561                 case IORING_OP_READ_FIXED:
562                 case IORING_OP_SENDMSG:
563                 case IORING_OP_RECVMSG:
564                 case IORING_OP_ACCEPT:
565                 case IORING_OP_POLL_ADD:
566                 case IORING_OP_CONNECT:
567                         /*
568                          * We know REQ_F_ISREG is not set on some of these
569                          * opcodes, but this enables us to keep the check in
570                          * just one place.
571                          */
572                         if (!(req->flags & REQ_F_ISREG))
573                                 req->work.flags |= IO_WQ_WORK_UNBOUND;
574                         break;
575                 }
576                 if (io_sqe_needs_user(req->sqe))
577                         req->work.flags |= IO_WQ_WORK_NEEDS_USER;
578         }
579
580         *link = io_prep_linked_timeout(req);
581         return do_hashed;
582 }
583
584 static inline void io_queue_async_work(struct io_kiocb *req)
585 {
586         struct io_ring_ctx *ctx = req->ctx;
587         struct io_kiocb *link;
588         bool do_hashed;
589
590         do_hashed = io_prep_async_work(req, &link);
591
592         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
593                                         req->flags);
594         if (!do_hashed) {
595                 io_wq_enqueue(ctx->io_wq, &req->work);
596         } else {
597                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
598                                         file_inode(req->file));
599         }
600
601         if (link)
602                 io_queue_linked_timeout(link);
603 }
604
605 static void io_kill_timeout(struct io_kiocb *req)
606 {
607         int ret;
608
609         ret = hrtimer_try_to_cancel(&req->timeout.data->timer);
610         if (ret != -1) {
611                 atomic_inc(&req->ctx->cq_timeouts);
612                 list_del_init(&req->list);
613                 io_cqring_fill_event(req, 0);
614                 io_put_req(req);
615         }
616 }
617
618 static void io_kill_timeouts(struct io_ring_ctx *ctx)
619 {
620         struct io_kiocb *req, *tmp;
621
622         spin_lock_irq(&ctx->completion_lock);
623         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
624                 io_kill_timeout(req);
625         spin_unlock_irq(&ctx->completion_lock);
626 }
627
628 static void io_commit_cqring(struct io_ring_ctx *ctx)
629 {
630         struct io_kiocb *req;
631
632         while ((req = io_get_timeout_req(ctx)) != NULL)
633                 io_kill_timeout(req);
634
635         __io_commit_cqring(ctx);
636
637         while ((req = io_get_deferred_req(ctx)) != NULL) {
638                 req->flags |= REQ_F_IO_DRAINED;
639                 io_queue_async_work(req);
640         }
641 }
642
643 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
644 {
645         struct io_rings *rings = ctx->rings;
646         unsigned tail;
647
648         tail = ctx->cached_cq_tail;
649         /*
650          * writes to the cq entry need to come after reading head; the
651          * control dependency is enough as we're using WRITE_ONCE to
652          * fill the cq entry
653          */
654         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
655                 return NULL;
656
657         ctx->cached_cq_tail++;
658         return &rings->cqes[tail & ctx->cq_mask];
659 }
660
661 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
662 {
663         if (waitqueue_active(&ctx->wait))
664                 wake_up(&ctx->wait);
665         if (waitqueue_active(&ctx->sqo_wait))
666                 wake_up(&ctx->sqo_wait);
667         if (ctx->cq_ev_fd)
668                 eventfd_signal(ctx->cq_ev_fd, 1);
669 }
670
671 /* Returns true if there are no backlogged entries after the flush */
672 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
673 {
674         struct io_rings *rings = ctx->rings;
675         struct io_uring_cqe *cqe;
676         struct io_kiocb *req;
677         unsigned long flags;
678         LIST_HEAD(list);
679
680         if (!force) {
681                 if (list_empty_careful(&ctx->cq_overflow_list))
682                         return true;
683                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
684                     rings->cq_ring_entries))
685                         return false;
686         }
687
688         spin_lock_irqsave(&ctx->completion_lock, flags);
689
690         /* if force is set, the ring is going away. always drop after that */
691         if (force)
692                 ctx->cq_overflow_flushed = true;
693
694         cqe = NULL;
695         while (!list_empty(&ctx->cq_overflow_list)) {
696                 cqe = io_get_cqring(ctx);
697                 if (!cqe && !force)
698                         break;
699
700                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
701                                                 list);
702                 list_move(&req->list, &list);
703                 if (cqe) {
704                         WRITE_ONCE(cqe->user_data, req->user_data);
705                         WRITE_ONCE(cqe->res, req->result);
706                         WRITE_ONCE(cqe->flags, 0);
707                 } else {
708                         WRITE_ONCE(ctx->rings->cq_overflow,
709                                 atomic_inc_return(&ctx->cached_cq_overflow));
710                 }
711         }
712
713         io_commit_cqring(ctx);
714         spin_unlock_irqrestore(&ctx->completion_lock, flags);
715         io_cqring_ev_posted(ctx);
716
717         while (!list_empty(&list)) {
718                 req = list_first_entry(&list, struct io_kiocb, list);
719                 list_del(&req->list);
720                 io_put_req(req);
721         }
722
723         return cqe != NULL;
724 }
725
726 static void io_cqring_fill_event(struct io_kiocb *req, long res)
727 {
728         struct io_ring_ctx *ctx = req->ctx;
729         struct io_uring_cqe *cqe;
730
731         trace_io_uring_complete(ctx, req->user_data, res);
732
733         /*
734          * If we can't get a cq entry, userspace overflowed the
735          * submission (by quite a lot). Increment the overflow count in
736          * the ring.
737          */
738         cqe = io_get_cqring(ctx);
739         if (likely(cqe)) {
740                 WRITE_ONCE(cqe->user_data, req->user_data);
741                 WRITE_ONCE(cqe->res, res);
742                 WRITE_ONCE(cqe->flags, 0);
743         } else if (ctx->cq_overflow_flushed) {
744                 WRITE_ONCE(ctx->rings->cq_overflow,
745                                 atomic_inc_return(&ctx->cached_cq_overflow));
746         } else {
747                 refcount_inc(&req->refs);
748                 req->result = res;
749                 list_add_tail(&req->list, &ctx->cq_overflow_list);
750         }
751 }
752
753 static void io_cqring_add_event(struct io_kiocb *req, long res)
754 {
755         struct io_ring_ctx *ctx = req->ctx;
756         unsigned long flags;
757
758         spin_lock_irqsave(&ctx->completion_lock, flags);
759         io_cqring_fill_event(req, res);
760         io_commit_cqring(ctx);
761         spin_unlock_irqrestore(&ctx->completion_lock, flags);
762
763         io_cqring_ev_posted(ctx);
764 }
765
766 static inline bool io_is_fallback_req(struct io_kiocb *req)
767 {
768         return req == (struct io_kiocb *)
769                         ((unsigned long) req->ctx->fallback_req & ~1UL);
770 }
771
772 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
773 {
774         struct io_kiocb *req;
775
776         req = ctx->fallback_req;
777         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
778                 return req;
779
780         return NULL;
781 }
782
783 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
784                                    struct io_submit_state *state)
785 {
786         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
787         struct io_kiocb *req;
788
789         if (!percpu_ref_tryget(&ctx->refs))
790                 return NULL;
791
792         if (!state) {
793                 req = kmem_cache_alloc(req_cachep, gfp);
794                 if (unlikely(!req))
795                         goto fallback;
796         } else if (!state->free_reqs) {
797                 size_t sz;
798                 int ret;
799
800                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
801                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
802
803                 /*
804                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
805                  * retry single alloc to be on the safe side.
806                  */
807                 if (unlikely(ret <= 0)) {
808                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
809                         if (!state->reqs[0])
810                                 goto fallback;
811                         ret = 1;
812                 }
813                 state->free_reqs = ret - 1;
814                 state->cur_req = 1;
815                 req = state->reqs[0];
816         } else {
817                 req = state->reqs[state->cur_req];
818                 state->free_reqs--;
819                 state->cur_req++;
820         }
821
822 got_it:
823         req->io = NULL;
824         req->ring_file = NULL;
825         req->file = NULL;
826         req->ctx = ctx;
827         req->flags = 0;
828         /* one is dropped after submission, the other at completion */
829         refcount_set(&req->refs, 2);
830         req->result = 0;
831         INIT_IO_WORK(&req->work, io_wq_submit_work);
832         return req;
833 fallback:
834         req = io_get_fallback_req(ctx);
835         if (req)
836                 goto got_it;
837         percpu_ref_put(&ctx->refs);
838         return NULL;
839 }
840
841 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
842 {
843         if (*nr) {
844                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
845                 percpu_ref_put_many(&ctx->refs, *nr);
846                 *nr = 0;
847         }
848 }
849
850 static void __io_free_req(struct io_kiocb *req)
851 {
852         struct io_ring_ctx *ctx = req->ctx;
853
854         if (req->io)
855                 kfree(req->io);
856         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
857                 fput(req->file);
858         if (req->flags & REQ_F_INFLIGHT) {
859                 unsigned long flags;
860
861                 spin_lock_irqsave(&ctx->inflight_lock, flags);
862                 list_del(&req->inflight_entry);
863                 if (waitqueue_active(&ctx->inflight_wait))
864                         wake_up(&ctx->inflight_wait);
865                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
866         }
867         if (req->flags & REQ_F_TIMEOUT)
868                 kfree(req->timeout.data);
869         percpu_ref_put(&ctx->refs);
870         if (likely(!io_is_fallback_req(req)))
871                 kmem_cache_free(req_cachep, req);
872         else
873                 clear_bit_unlock(0, (unsigned long *) ctx->fallback_req);
874 }
875
876 static bool io_link_cancel_timeout(struct io_kiocb *req)
877 {
878         struct io_ring_ctx *ctx = req->ctx;
879         int ret;
880
881         ret = hrtimer_try_to_cancel(&req->timeout.data->timer);
882         if (ret != -1) {
883                 io_cqring_fill_event(req, -ECANCELED);
884                 io_commit_cqring(ctx);
885                 req->flags &= ~REQ_F_LINK;
886                 io_put_req(req);
887                 return true;
888         }
889
890         return false;
891 }
892
893 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
894 {
895         struct io_ring_ctx *ctx = req->ctx;
896         struct io_kiocb *nxt;
897         bool wake_ev = false;
898
899         /* Already got next link */
900         if (req->flags & REQ_F_LINK_NEXT)
901                 return;
902
903         /*
904          * The list should never be empty when we are called here. But could
905          * potentially happen if the chain is messed up, check to be on the
906          * safe side.
907          */
908         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
909         while (nxt) {
910                 list_del_init(&nxt->list);
911
912                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
913                     (nxt->flags & REQ_F_TIMEOUT)) {
914                         wake_ev |= io_link_cancel_timeout(nxt);
915                         nxt = list_first_entry_or_null(&req->link_list,
916                                                         struct io_kiocb, list);
917                         req->flags &= ~REQ_F_LINK_TIMEOUT;
918                         continue;
919                 }
920                 if (!list_empty(&req->link_list)) {
921                         INIT_LIST_HEAD(&nxt->link_list);
922                         list_splice(&req->link_list, &nxt->link_list);
923                         nxt->flags |= REQ_F_LINK;
924                 }
925
926                 *nxtptr = nxt;
927                 break;
928         }
929
930         req->flags |= REQ_F_LINK_NEXT;
931         if (wake_ev)
932                 io_cqring_ev_posted(ctx);
933 }
934
935 /*
936  * Called if REQ_F_LINK is set, and we fail the head request
937  */
938 static void io_fail_links(struct io_kiocb *req)
939 {
940         struct io_ring_ctx *ctx = req->ctx;
941         struct io_kiocb *link;
942         unsigned long flags;
943
944         spin_lock_irqsave(&ctx->completion_lock, flags);
945
946         while (!list_empty(&req->link_list)) {
947                 link = list_first_entry(&req->link_list, struct io_kiocb, list);
948                 list_del_init(&link->list);
949
950                 trace_io_uring_fail_link(req, link);
951
952                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
953                     link->sqe->opcode == IORING_OP_LINK_TIMEOUT) {
954                         io_link_cancel_timeout(link);
955                 } else {
956                         io_cqring_fill_event(link, -ECANCELED);
957                         __io_double_put_req(link);
958                 }
959                 req->flags &= ~REQ_F_LINK_TIMEOUT;
960         }
961
962         io_commit_cqring(ctx);
963         spin_unlock_irqrestore(&ctx->completion_lock, flags);
964         io_cqring_ev_posted(ctx);
965 }
966
967 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
968 {
969         if (likely(!(req->flags & REQ_F_LINK)))
970                 return;
971
972         /*
973          * If LINK is set, we have dependent requests in this chain. If we
974          * didn't fail this request, queue the first one up, moving any other
975          * dependencies to the next request. In case of failure, fail the rest
976          * of the chain.
977          */
978         if (req->flags & REQ_F_FAIL_LINK) {
979                 io_fail_links(req);
980         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
981                         REQ_F_LINK_TIMEOUT) {
982                 struct io_ring_ctx *ctx = req->ctx;
983                 unsigned long flags;
984
985                 /*
986                  * If this is a timeout link, we could be racing with the
987                  * timeout timer. Grab the completion lock for this case to
988                  * protect against that.
989                  */
990                 spin_lock_irqsave(&ctx->completion_lock, flags);
991                 io_req_link_next(req, nxt);
992                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
993         } else {
994                 io_req_link_next(req, nxt);
995         }
996 }
997
998 static void io_free_req(struct io_kiocb *req)
999 {
1000         struct io_kiocb *nxt = NULL;
1001
1002         io_req_find_next(req, &nxt);
1003         __io_free_req(req);
1004
1005         if (nxt)
1006                 io_queue_async_work(nxt);
1007 }
1008
1009 /*
1010  * Drop reference to request, return next in chain (if there is one) if this
1011  * was the last reference to this request.
1012  */
1013 __attribute__((nonnull))
1014 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1015 {
1016         io_req_find_next(req, nxtptr);
1017
1018         if (refcount_dec_and_test(&req->refs))
1019                 __io_free_req(req);
1020 }
1021
1022 static void io_put_req(struct io_kiocb *req)
1023 {
1024         if (refcount_dec_and_test(&req->refs))
1025                 io_free_req(req);
1026 }
1027
1028 /*
1029  * Must only be used if we don't need to care about links, usually from
1030  * within the completion handling itself.
1031  */
1032 static void __io_double_put_req(struct io_kiocb *req)
1033 {
1034         /* drop both submit and complete references */
1035         if (refcount_sub_and_test(2, &req->refs))
1036                 __io_free_req(req);
1037 }
1038
1039 static void io_double_put_req(struct io_kiocb *req)
1040 {
1041         /* drop both submit and complete references */
1042         if (refcount_sub_and_test(2, &req->refs))
1043                 io_free_req(req);
1044 }
1045
1046 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1047 {
1048         struct io_rings *rings = ctx->rings;
1049
1050         /*
1051          * noflush == true is from the waitqueue handler, just ensure we wake
1052          * up the task, and the next invocation will flush the entries. We
1053          * cannot safely to it from here.
1054          */
1055         if (noflush && !list_empty(&ctx->cq_overflow_list))
1056                 return -1U;
1057
1058         io_cqring_overflow_flush(ctx, false);
1059
1060         /* See comment at the top of this file */
1061         smp_rmb();
1062         return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
1063 }
1064
1065 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1066 {
1067         struct io_rings *rings = ctx->rings;
1068
1069         /* make sure SQ entry isn't read before tail */
1070         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1071 }
1072
1073 /*
1074  * Find and free completed poll iocbs
1075  */
1076 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1077                                struct list_head *done)
1078 {
1079         void *reqs[IO_IOPOLL_BATCH];
1080         struct io_kiocb *req;
1081         int to_free;
1082
1083         to_free = 0;
1084         while (!list_empty(done)) {
1085                 req = list_first_entry(done, struct io_kiocb, list);
1086                 list_del(&req->list);
1087
1088                 io_cqring_fill_event(req, req->result);
1089                 (*nr_events)++;
1090
1091                 if (refcount_dec_and_test(&req->refs)) {
1092                         /* If we're not using fixed files, we have to pair the
1093                          * completion part with the file put. Use regular
1094                          * completions for those, only batch free for fixed
1095                          * file and non-linked commands.
1096                          */
1097                         if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
1098                             REQ_F_FIXED_FILE) && !io_is_fallback_req(req) &&
1099                             !req->io) {
1100                                 reqs[to_free++] = req;
1101                                 if (to_free == ARRAY_SIZE(reqs))
1102                                         io_free_req_many(ctx, reqs, &to_free);
1103                         } else {
1104                                 io_free_req(req);
1105                         }
1106                 }
1107         }
1108
1109         io_commit_cqring(ctx);
1110         io_free_req_many(ctx, reqs, &to_free);
1111 }
1112
1113 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1114                         long min)
1115 {
1116         struct io_kiocb *req, *tmp;
1117         LIST_HEAD(done);
1118         bool spin;
1119         int ret;
1120
1121         /*
1122          * Only spin for completions if we don't have multiple devices hanging
1123          * off our complete list, and we're under the requested amount.
1124          */
1125         spin = !ctx->poll_multi_file && *nr_events < min;
1126
1127         ret = 0;
1128         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1129                 struct kiocb *kiocb = &req->rw;
1130
1131                 /*
1132                  * Move completed entries to our local list. If we find a
1133                  * request that requires polling, break out and complete
1134                  * the done list first, if we have entries there.
1135                  */
1136                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1137                         list_move_tail(&req->list, &done);
1138                         continue;
1139                 }
1140                 if (!list_empty(&done))
1141                         break;
1142
1143                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1144                 if (ret < 0)
1145                         break;
1146
1147                 if (ret && spin)
1148                         spin = false;
1149                 ret = 0;
1150         }
1151
1152         if (!list_empty(&done))
1153                 io_iopoll_complete(ctx, nr_events, &done);
1154
1155         return ret;
1156 }
1157
1158 /*
1159  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
1160  * non-spinning poll check - we'll still enter the driver poll loop, but only
1161  * as a non-spinning completion check.
1162  */
1163 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1164                                 long min)
1165 {
1166         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1167                 int ret;
1168
1169                 ret = io_do_iopoll(ctx, nr_events, min);
1170                 if (ret < 0)
1171                         return ret;
1172                 if (!min || *nr_events >= min)
1173                         return 0;
1174         }
1175
1176         return 1;
1177 }
1178
1179 /*
1180  * We can't just wait for polled events to come to us, we have to actively
1181  * find and complete them.
1182  */
1183 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1184 {
1185         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1186                 return;
1187
1188         mutex_lock(&ctx->uring_lock);
1189         while (!list_empty(&ctx->poll_list)) {
1190                 unsigned int nr_events = 0;
1191
1192                 io_iopoll_getevents(ctx, &nr_events, 1);
1193
1194                 /*
1195                  * Ensure we allow local-to-the-cpu processing to take place,
1196                  * in this case we need to ensure that we reap all events.
1197                  */
1198                 cond_resched();
1199         }
1200         mutex_unlock(&ctx->uring_lock);
1201 }
1202
1203 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1204                             long min)
1205 {
1206         int iters = 0, ret = 0;
1207
1208         do {
1209                 int tmin = 0;
1210
1211                 /*
1212                  * Don't enter poll loop if we already have events pending.
1213                  * If we do, we can potentially be spinning for commands that
1214                  * already triggered a CQE (eg in error).
1215                  */
1216                 if (io_cqring_events(ctx, false))
1217                         break;
1218
1219                 /*
1220                  * If a submit got punted to a workqueue, we can have the
1221                  * application entering polling for a command before it gets
1222                  * issued. That app will hold the uring_lock for the duration
1223                  * of the poll right here, so we need to take a breather every
1224                  * now and then to ensure that the issue has a chance to add
1225                  * the poll to the issued list. Otherwise we can spin here
1226                  * forever, while the workqueue is stuck trying to acquire the
1227                  * very same mutex.
1228                  */
1229                 if (!(++iters & 7)) {
1230                         mutex_unlock(&ctx->uring_lock);
1231                         mutex_lock(&ctx->uring_lock);
1232                 }
1233
1234                 if (*nr_events < min)
1235                         tmin = min - *nr_events;
1236
1237                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1238                 if (ret <= 0)
1239                         break;
1240                 ret = 0;
1241         } while (min && !*nr_events && !need_resched());
1242
1243         return ret;
1244 }
1245
1246 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1247                            long min)
1248 {
1249         int ret;
1250
1251         /*
1252          * We disallow the app entering submit/complete with polling, but we
1253          * still need to lock the ring to prevent racing with polled issue
1254          * that got punted to a workqueue.
1255          */
1256         mutex_lock(&ctx->uring_lock);
1257         ret = __io_iopoll_check(ctx, nr_events, min);
1258         mutex_unlock(&ctx->uring_lock);
1259         return ret;
1260 }
1261
1262 static void kiocb_end_write(struct io_kiocb *req)
1263 {
1264         /*
1265          * Tell lockdep we inherited freeze protection from submission
1266          * thread.
1267          */
1268         if (req->flags & REQ_F_ISREG) {
1269                 struct inode *inode = file_inode(req->file);
1270
1271                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1272         }
1273         file_end_write(req->file);
1274 }
1275
1276 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1277 {
1278         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1279
1280         if (kiocb->ki_flags & IOCB_WRITE)
1281                 kiocb_end_write(req);
1282
1283         if ((req->flags & REQ_F_LINK) && res != req->result)
1284                 req->flags |= REQ_F_FAIL_LINK;
1285         io_cqring_add_event(req, res);
1286 }
1287
1288 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1289 {
1290         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1291
1292         io_complete_rw_common(kiocb, res);
1293         io_put_req(req);
1294 }
1295
1296 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1297 {
1298         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1299         struct io_kiocb *nxt = NULL;
1300
1301         io_complete_rw_common(kiocb, res);
1302         io_put_req_find_next(req, &nxt);
1303
1304         return nxt;
1305 }
1306
1307 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1308 {
1309         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1310
1311         if (kiocb->ki_flags & IOCB_WRITE)
1312                 kiocb_end_write(req);
1313
1314         if ((req->flags & REQ_F_LINK) && res != req->result)
1315                 req->flags |= REQ_F_FAIL_LINK;
1316         req->result = res;
1317         if (res != -EAGAIN)
1318                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1319 }
1320
1321 /*
1322  * After the iocb has been issued, it's safe to be found on the poll list.
1323  * Adding the kiocb to the list AFTER submission ensures that we don't
1324  * find it from a io_iopoll_getevents() thread before the issuer is done
1325  * accessing the kiocb cookie.
1326  */
1327 static void io_iopoll_req_issued(struct io_kiocb *req)
1328 {
1329         struct io_ring_ctx *ctx = req->ctx;
1330
1331         /*
1332          * Track whether we have multiple files in our lists. This will impact
1333          * how we do polling eventually, not spinning if we're on potentially
1334          * different devices.
1335          */
1336         if (list_empty(&ctx->poll_list)) {
1337                 ctx->poll_multi_file = false;
1338         } else if (!ctx->poll_multi_file) {
1339                 struct io_kiocb *list_req;
1340
1341                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1342                                                 list);
1343                 if (list_req->rw.ki_filp != req->rw.ki_filp)
1344                         ctx->poll_multi_file = true;
1345         }
1346
1347         /*
1348          * For fast devices, IO may have already completed. If it has, add
1349          * it to the front so we find it first.
1350          */
1351         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1352                 list_add(&req->list, &ctx->poll_list);
1353         else
1354                 list_add_tail(&req->list, &ctx->poll_list);
1355 }
1356
1357 static void io_file_put(struct io_submit_state *state)
1358 {
1359         if (state->file) {
1360                 int diff = state->has_refs - state->used_refs;
1361
1362                 if (diff)
1363                         fput_many(state->file, diff);
1364                 state->file = NULL;
1365         }
1366 }
1367
1368 /*
1369  * Get as many references to a file as we have IOs left in this submission,
1370  * assuming most submissions are for one file, or at least that each file
1371  * has more than one submission.
1372  */
1373 static struct file *io_file_get(struct io_submit_state *state, int fd)
1374 {
1375         if (!state)
1376                 return fget(fd);
1377
1378         if (state->file) {
1379                 if (state->fd == fd) {
1380                         state->used_refs++;
1381                         state->ios_left--;
1382                         return state->file;
1383                 }
1384                 io_file_put(state);
1385         }
1386         state->file = fget_many(fd, state->ios_left);
1387         if (!state->file)
1388                 return NULL;
1389
1390         state->fd = fd;
1391         state->has_refs = state->ios_left;
1392         state->used_refs = 1;
1393         state->ios_left--;
1394         return state->file;
1395 }
1396
1397 /*
1398  * If we tracked the file through the SCM inflight mechanism, we could support
1399  * any file. For now, just ensure that anything potentially problematic is done
1400  * inline.
1401  */
1402 static bool io_file_supports_async(struct file *file)
1403 {
1404         umode_t mode = file_inode(file)->i_mode;
1405
1406         if (S_ISBLK(mode) || S_ISCHR(mode))
1407                 return true;
1408         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1409                 return true;
1410
1411         return false;
1412 }
1413
1414 static int io_prep_rw(struct io_kiocb *req, bool force_nonblock)
1415 {
1416         const struct io_uring_sqe *sqe = req->sqe;
1417         struct io_ring_ctx *ctx = req->ctx;
1418         struct kiocb *kiocb = &req->rw;
1419         unsigned ioprio;
1420         int ret;
1421
1422         if (!req->file)
1423                 return -EBADF;
1424
1425         if (S_ISREG(file_inode(req->file)->i_mode))
1426                 req->flags |= REQ_F_ISREG;
1427
1428         kiocb->ki_pos = READ_ONCE(sqe->off);
1429         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1430         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1431
1432         ioprio = READ_ONCE(sqe->ioprio);
1433         if (ioprio) {
1434                 ret = ioprio_check_cap(ioprio);
1435                 if (ret)
1436                         return ret;
1437
1438                 kiocb->ki_ioprio = ioprio;
1439         } else
1440                 kiocb->ki_ioprio = get_current_ioprio();
1441
1442         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1443         if (unlikely(ret))
1444                 return ret;
1445
1446         /* don't allow async punt if RWF_NOWAIT was requested */
1447         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1448             (req->file->f_flags & O_NONBLOCK))
1449                 req->flags |= REQ_F_NOWAIT;
1450
1451         if (force_nonblock)
1452                 kiocb->ki_flags |= IOCB_NOWAIT;
1453
1454         if (ctx->flags & IORING_SETUP_IOPOLL) {
1455                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1456                     !kiocb->ki_filp->f_op->iopoll)
1457                         return -EOPNOTSUPP;
1458
1459                 kiocb->ki_flags |= IOCB_HIPRI;
1460                 kiocb->ki_complete = io_complete_rw_iopoll;
1461                 req->result = 0;
1462         } else {
1463                 if (kiocb->ki_flags & IOCB_HIPRI)
1464                         return -EINVAL;
1465                 kiocb->ki_complete = io_complete_rw;
1466         }
1467         return 0;
1468 }
1469
1470 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1471 {
1472         switch (ret) {
1473         case -EIOCBQUEUED:
1474                 break;
1475         case -ERESTARTSYS:
1476         case -ERESTARTNOINTR:
1477         case -ERESTARTNOHAND:
1478         case -ERESTART_RESTARTBLOCK:
1479                 /*
1480                  * We can't just restart the syscall, since previously
1481                  * submitted sqes may already be in progress. Just fail this
1482                  * IO with EINTR.
1483                  */
1484                 ret = -EINTR;
1485                 /* fall through */
1486         default:
1487                 kiocb->ki_complete(kiocb, ret, 0);
1488         }
1489 }
1490
1491 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1492                        bool in_async)
1493 {
1494         if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1495                 *nxt = __io_complete_rw(kiocb, ret);
1496         else
1497                 io_rw_done(kiocb, ret);
1498 }
1499
1500 static ssize_t io_import_fixed(struct io_ring_ctx *ctx, int rw,
1501                                const struct io_uring_sqe *sqe,
1502                                struct iov_iter *iter)
1503 {
1504         size_t len = READ_ONCE(sqe->len);
1505         struct io_mapped_ubuf *imu;
1506         unsigned index, buf_index;
1507         size_t offset;
1508         u64 buf_addr;
1509
1510         /* attempt to use fixed buffers without having provided iovecs */
1511         if (unlikely(!ctx->user_bufs))
1512                 return -EFAULT;
1513
1514         buf_index = READ_ONCE(sqe->buf_index);
1515         if (unlikely(buf_index >= ctx->nr_user_bufs))
1516                 return -EFAULT;
1517
1518         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1519         imu = &ctx->user_bufs[index];
1520         buf_addr = READ_ONCE(sqe->addr);
1521
1522         /* overflow */
1523         if (buf_addr + len < buf_addr)
1524                 return -EFAULT;
1525         /* not inside the mapped region */
1526         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1527                 return -EFAULT;
1528
1529         /*
1530          * May not be a start of buffer, set size appropriately
1531          * and advance us to the beginning.
1532          */
1533         offset = buf_addr - imu->ubuf;
1534         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1535
1536         if (offset) {
1537                 /*
1538                  * Don't use iov_iter_advance() here, as it's really slow for
1539                  * using the latter parts of a big fixed buffer - it iterates
1540                  * over each segment manually. We can cheat a bit here, because
1541                  * we know that:
1542                  *
1543                  * 1) it's a BVEC iter, we set it up
1544                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1545                  *    first and last bvec
1546                  *
1547                  * So just find our index, and adjust the iterator afterwards.
1548                  * If the offset is within the first bvec (or the whole first
1549                  * bvec, just use iov_iter_advance(). This makes it easier
1550                  * since we can just skip the first segment, which may not
1551                  * be PAGE_SIZE aligned.
1552                  */
1553                 const struct bio_vec *bvec = imu->bvec;
1554
1555                 if (offset <= bvec->bv_len) {
1556                         iov_iter_advance(iter, offset);
1557                 } else {
1558                         unsigned long seg_skip;
1559
1560                         /* skip first vec */
1561                         offset -= bvec->bv_len;
1562                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1563
1564                         iter->bvec = bvec + seg_skip;
1565                         iter->nr_segs -= seg_skip;
1566                         iter->count -= bvec->bv_len + offset;
1567                         iter->iov_offset = offset & ~PAGE_MASK;
1568                 }
1569         }
1570
1571         return len;
1572 }
1573
1574 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1575                                struct iovec **iovec, struct iov_iter *iter)
1576 {
1577         const struct io_uring_sqe *sqe = req->sqe;
1578         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1579         size_t sqe_len = READ_ONCE(sqe->len);
1580         u8 opcode;
1581
1582         /*
1583          * We're reading ->opcode for the second time, but the first read
1584          * doesn't care whether it's _FIXED or not, so it doesn't matter
1585          * whether ->opcode changes concurrently. The first read does care
1586          * about whether it is a READ or a WRITE, so we don't trust this read
1587          * for that purpose and instead let the caller pass in the read/write
1588          * flag.
1589          */
1590         opcode = READ_ONCE(sqe->opcode);
1591         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1592                 *iovec = NULL;
1593                 return io_import_fixed(req->ctx, rw, sqe, iter);
1594         }
1595
1596         if (req->io) {
1597                 struct io_async_rw *iorw = &req->io->rw;
1598
1599                 *iovec = iorw->iov;
1600                 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
1601                 if (iorw->iov == iorw->fast_iov)
1602                         *iovec = NULL;
1603                 return iorw->size;
1604         }
1605
1606         if (!req->has_user)
1607                 return -EFAULT;
1608
1609 #ifdef CONFIG_COMPAT
1610         if (req->ctx->compat)
1611                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1612                                                 iovec, iter);
1613 #endif
1614
1615         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1616 }
1617
1618 /*
1619  * For files that don't have ->read_iter() and ->write_iter(), handle them
1620  * by looping over ->read() or ->write() manually.
1621  */
1622 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1623                            struct iov_iter *iter)
1624 {
1625         ssize_t ret = 0;
1626
1627         /*
1628          * Don't support polled IO through this interface, and we can't
1629          * support non-blocking either. For the latter, this just causes
1630          * the kiocb to be handled from an async context.
1631          */
1632         if (kiocb->ki_flags & IOCB_HIPRI)
1633                 return -EOPNOTSUPP;
1634         if (kiocb->ki_flags & IOCB_NOWAIT)
1635                 return -EAGAIN;
1636
1637         while (iov_iter_count(iter)) {
1638                 struct iovec iovec;
1639                 ssize_t nr;
1640
1641                 if (!iov_iter_is_bvec(iter)) {
1642                         iovec = iov_iter_iovec(iter);
1643                 } else {
1644                         /* fixed buffers import bvec */
1645                         iovec.iov_base = kmap(iter->bvec->bv_page)
1646                                                 + iter->iov_offset;
1647                         iovec.iov_len = min(iter->count,
1648                                         iter->bvec->bv_len - iter->iov_offset);
1649                 }
1650
1651                 if (rw == READ) {
1652                         nr = file->f_op->read(file, iovec.iov_base,
1653                                               iovec.iov_len, &kiocb->ki_pos);
1654                 } else {
1655                         nr = file->f_op->write(file, iovec.iov_base,
1656                                                iovec.iov_len, &kiocb->ki_pos);
1657                 }
1658
1659                 if (iov_iter_is_bvec(iter))
1660                         kunmap(iter->bvec->bv_page);
1661
1662                 if (nr < 0) {
1663                         if (!ret)
1664                                 ret = nr;
1665                         break;
1666                 }
1667                 ret += nr;
1668                 if (nr != iovec.iov_len)
1669                         break;
1670                 iov_iter_advance(iter, nr);
1671         }
1672
1673         return ret;
1674 }
1675
1676 static void io_req_map_io(struct io_kiocb *req, ssize_t io_size,
1677                           struct iovec *iovec, struct iovec *fast_iov,
1678                           struct iov_iter *iter)
1679 {
1680         req->io->rw.nr_segs = iter->nr_segs;
1681         req->io->rw.size = io_size;
1682         req->io->rw.iov = iovec;
1683         if (!req->io->rw.iov) {
1684                 req->io->rw.iov = req->io->rw.fast_iov;
1685                 memcpy(req->io->rw.iov, fast_iov,
1686                         sizeof(struct iovec) * iter->nr_segs);
1687         }
1688 }
1689
1690 static int io_setup_async_io(struct io_kiocb *req, ssize_t io_size,
1691                              struct iovec *iovec, struct iovec *fast_iov,
1692                              struct iov_iter *iter)
1693 {
1694         req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
1695         if (req->io) {
1696                 io_req_map_io(req, io_size, iovec, fast_iov, iter);
1697                 memcpy(&req->io->sqe, req->sqe, sizeof(req->io->sqe));
1698                 req->sqe = &req->io->sqe;
1699                 return 0;
1700         }
1701
1702         return -ENOMEM;
1703 }
1704
1705 static int io_read_prep(struct io_kiocb *req, struct iovec **iovec,
1706                         struct iov_iter *iter, bool force_nonblock)
1707 {
1708         ssize_t ret;
1709
1710         ret = io_prep_rw(req, force_nonblock);
1711         if (ret)
1712                 return ret;
1713
1714         if (unlikely(!(req->file->f_mode & FMODE_READ)))
1715                 return -EBADF;
1716
1717         return io_import_iovec(READ, req, iovec, iter);
1718 }
1719
1720 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
1721                    bool force_nonblock)
1722 {
1723         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1724         struct kiocb *kiocb = &req->rw;
1725         struct iov_iter iter;
1726         struct file *file;
1727         size_t iov_count;
1728         ssize_t io_size, ret;
1729
1730         if (!req->io) {
1731                 ret = io_read_prep(req, &iovec, &iter, force_nonblock);
1732                 if (ret < 0)
1733                         return ret;
1734         } else {
1735                 ret = io_import_iovec(READ, req, &iovec, &iter);
1736                 if (ret < 0)
1737                         return ret;
1738         }
1739
1740         file = req->file;
1741         io_size = ret;
1742         if (req->flags & REQ_F_LINK)
1743                 req->result = io_size;
1744
1745         /*
1746          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1747          * we know to async punt it even if it was opened O_NONBLOCK
1748          */
1749         if (force_nonblock && !io_file_supports_async(file)) {
1750                 req->flags |= REQ_F_MUST_PUNT;
1751                 goto copy_iov;
1752         }
1753
1754         iov_count = iov_iter_count(&iter);
1755         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1756         if (!ret) {
1757                 ssize_t ret2;
1758
1759                 if (file->f_op->read_iter)
1760                         ret2 = call_read_iter(file, kiocb, &iter);
1761                 else
1762                         ret2 = loop_rw_iter(READ, file, kiocb, &iter);
1763
1764                 /*
1765                  * In case of a short read, punt to async. This can happen
1766                  * if we have data partially cached. Alternatively we can
1767                  * return the short read, in which case the application will
1768                  * need to issue another SQE and wait for it. That SQE will
1769                  * need async punt anyway, so it's more efficient to do it
1770                  * here.
1771                  */
1772                 if (force_nonblock && !(req->flags & REQ_F_NOWAIT) &&
1773                     (req->flags & REQ_F_ISREG) &&
1774                     ret2 > 0 && ret2 < io_size)
1775                         ret2 = -EAGAIN;
1776                 /* Catch -EAGAIN return for forced non-blocking submission */
1777                 if (!force_nonblock || ret2 != -EAGAIN) {
1778                         kiocb_done(kiocb, ret2, nxt, req->in_async);
1779                 } else {
1780 copy_iov:
1781                         ret = io_setup_async_io(req, io_size, iovec,
1782                                                 inline_vecs, &iter);
1783                         if (ret)
1784                                 goto out_free;
1785                         return -EAGAIN;
1786                 }
1787         }
1788 out_free:
1789         kfree(iovec);
1790         return ret;
1791 }
1792
1793 static int io_write_prep(struct io_kiocb *req, struct iovec **iovec,
1794                          struct iov_iter *iter, bool force_nonblock)
1795 {
1796         ssize_t ret;
1797
1798         ret = io_prep_rw(req, force_nonblock);
1799         if (ret)
1800                 return ret;
1801
1802         if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
1803                 return -EBADF;
1804
1805         return io_import_iovec(WRITE, req, iovec, iter);
1806 }
1807
1808 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
1809                     bool force_nonblock)
1810 {
1811         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1812         struct kiocb *kiocb = &req->rw;
1813         struct iov_iter iter;
1814         struct file *file;
1815         size_t iov_count;
1816         ssize_t ret, io_size;
1817
1818         if (!req->io) {
1819                 ret = io_write_prep(req, &iovec, &iter, force_nonblock);
1820                 if (ret < 0)
1821                         return ret;
1822         } else {
1823                 ret = io_import_iovec(WRITE, req, &iovec, &iter);
1824                 if (ret < 0)
1825                         return ret;
1826         }
1827
1828         file = kiocb->ki_filp;
1829         io_size = ret;
1830         if (req->flags & REQ_F_LINK)
1831                 req->result = io_size;
1832
1833         /*
1834          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1835          * we know to async punt it even if it was opened O_NONBLOCK
1836          */
1837         if (force_nonblock && !io_file_supports_async(req->file)) {
1838                 req->flags |= REQ_F_MUST_PUNT;
1839                 goto copy_iov;
1840         }
1841
1842         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
1843                 goto copy_iov;
1844
1845         iov_count = iov_iter_count(&iter);
1846         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1847         if (!ret) {
1848                 ssize_t ret2;
1849
1850                 /*
1851                  * Open-code file_start_write here to grab freeze protection,
1852                  * which will be released by another thread in
1853                  * io_complete_rw().  Fool lockdep by telling it the lock got
1854                  * released so that it doesn't complain about the held lock when
1855                  * we return to userspace.
1856                  */
1857                 if (req->flags & REQ_F_ISREG) {
1858                         __sb_start_write(file_inode(file)->i_sb,
1859                                                 SB_FREEZE_WRITE, true);
1860                         __sb_writers_release(file_inode(file)->i_sb,
1861                                                 SB_FREEZE_WRITE);
1862                 }
1863                 kiocb->ki_flags |= IOCB_WRITE;
1864
1865                 if (file->f_op->write_iter)
1866                         ret2 = call_write_iter(file, kiocb, &iter);
1867                 else
1868                         ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
1869                 if (!force_nonblock || ret2 != -EAGAIN) {
1870                         kiocb_done(kiocb, ret2, nxt, req->in_async);
1871                 } else {
1872 copy_iov:
1873                         ret = io_setup_async_io(req, io_size, iovec,
1874                                                 inline_vecs, &iter);
1875                         if (ret)
1876                                 goto out_free;
1877                         return -EAGAIN;
1878                 }
1879         }
1880 out_free:
1881         kfree(iovec);
1882         return ret;
1883 }
1884
1885 /*
1886  * IORING_OP_NOP just posts a completion event, nothing else.
1887  */
1888 static int io_nop(struct io_kiocb *req)
1889 {
1890         struct io_ring_ctx *ctx = req->ctx;
1891
1892         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1893                 return -EINVAL;
1894
1895         io_cqring_add_event(req, 0);
1896         io_put_req(req);
1897         return 0;
1898 }
1899
1900 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1901 {
1902         struct io_ring_ctx *ctx = req->ctx;
1903
1904         if (!req->file)
1905                 return -EBADF;
1906
1907         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1908                 return -EINVAL;
1909         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1910                 return -EINVAL;
1911
1912         return 0;
1913 }
1914
1915 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1916                     struct io_kiocb **nxt, bool force_nonblock)
1917 {
1918         loff_t sqe_off = READ_ONCE(sqe->off);
1919         loff_t sqe_len = READ_ONCE(sqe->len);
1920         loff_t end = sqe_off + sqe_len;
1921         unsigned fsync_flags;
1922         int ret;
1923
1924         fsync_flags = READ_ONCE(sqe->fsync_flags);
1925         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1926                 return -EINVAL;
1927
1928         ret = io_prep_fsync(req, sqe);
1929         if (ret)
1930                 return ret;
1931
1932         /* fsync always requires a blocking context */
1933         if (force_nonblock)
1934                 return -EAGAIN;
1935
1936         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1937                                 end > 0 ? end : LLONG_MAX,
1938                                 fsync_flags & IORING_FSYNC_DATASYNC);
1939
1940         if (ret < 0 && (req->flags & REQ_F_LINK))
1941                 req->flags |= REQ_F_FAIL_LINK;
1942         io_cqring_add_event(req, ret);
1943         io_put_req_find_next(req, nxt);
1944         return 0;
1945 }
1946
1947 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1948 {
1949         struct io_ring_ctx *ctx = req->ctx;
1950         int ret = 0;
1951
1952         if (!req->file)
1953                 return -EBADF;
1954
1955         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1956                 return -EINVAL;
1957         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1958                 return -EINVAL;
1959
1960         return ret;
1961 }
1962
1963 static int io_sync_file_range(struct io_kiocb *req,
1964                               const struct io_uring_sqe *sqe,
1965                               struct io_kiocb **nxt,
1966                               bool force_nonblock)
1967 {
1968         loff_t sqe_off;
1969         loff_t sqe_len;
1970         unsigned flags;
1971         int ret;
1972
1973         ret = io_prep_sfr(req, sqe);
1974         if (ret)
1975                 return ret;
1976
1977         /* sync_file_range always requires a blocking context */
1978         if (force_nonblock)
1979                 return -EAGAIN;
1980
1981         sqe_off = READ_ONCE(sqe->off);
1982         sqe_len = READ_ONCE(sqe->len);
1983         flags = READ_ONCE(sqe->sync_range_flags);
1984
1985         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1986
1987         if (ret < 0 && (req->flags & REQ_F_LINK))
1988                 req->flags |= REQ_F_FAIL_LINK;
1989         io_cqring_add_event(req, ret);
1990         io_put_req_find_next(req, nxt);
1991         return 0;
1992 }
1993
1994 #if defined(CONFIG_NET)
1995 static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1996                            struct io_kiocb **nxt, bool force_nonblock,
1997                    long (*fn)(struct socket *, struct user_msghdr __user *,
1998                                 unsigned int))
1999 {
2000         struct socket *sock;
2001         int ret;
2002
2003         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2004                 return -EINVAL;
2005
2006         sock = sock_from_file(req->file, &ret);
2007         if (sock) {
2008                 struct user_msghdr __user *msg;
2009                 unsigned flags;
2010
2011                 flags = READ_ONCE(sqe->msg_flags);
2012                 if (flags & MSG_DONTWAIT)
2013                         req->flags |= REQ_F_NOWAIT;
2014                 else if (force_nonblock)
2015                         flags |= MSG_DONTWAIT;
2016
2017                 msg = (struct user_msghdr __user *) (unsigned long)
2018                         READ_ONCE(sqe->addr);
2019
2020                 ret = fn(sock, msg, flags);
2021                 if (force_nonblock && ret == -EAGAIN)
2022                         return ret;
2023                 if (ret == -ERESTARTSYS)
2024                         ret = -EINTR;
2025         }
2026
2027         io_cqring_add_event(req, ret);
2028         if (ret < 0 && (req->flags & REQ_F_LINK))
2029                 req->flags |= REQ_F_FAIL_LINK;
2030         io_put_req_find_next(req, nxt);
2031         return 0;
2032 }
2033 #endif
2034
2035 static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2036                       struct io_kiocb **nxt, bool force_nonblock)
2037 {
2038 #if defined(CONFIG_NET)
2039         return io_send_recvmsg(req, sqe, nxt, force_nonblock,
2040                                 __sys_sendmsg_sock);
2041 #else
2042         return -EOPNOTSUPP;
2043 #endif
2044 }
2045
2046 static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2047                       struct io_kiocb **nxt, bool force_nonblock)
2048 {
2049 #if defined(CONFIG_NET)
2050         return io_send_recvmsg(req, sqe, nxt, force_nonblock,
2051                                 __sys_recvmsg_sock);
2052 #else
2053         return -EOPNOTSUPP;
2054 #endif
2055 }
2056
2057 static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2058                      struct io_kiocb **nxt, bool force_nonblock)
2059 {
2060 #if defined(CONFIG_NET)
2061         struct sockaddr __user *addr;
2062         int __user *addr_len;
2063         unsigned file_flags;
2064         int flags, ret;
2065
2066         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2067                 return -EINVAL;
2068         if (sqe->ioprio || sqe->len || sqe->buf_index)
2069                 return -EINVAL;
2070
2071         addr = (struct sockaddr __user *) (unsigned long) READ_ONCE(sqe->addr);
2072         addr_len = (int __user *) (unsigned long) READ_ONCE(sqe->addr2);
2073         flags = READ_ONCE(sqe->accept_flags);
2074         file_flags = force_nonblock ? O_NONBLOCK : 0;
2075
2076         ret = __sys_accept4_file(req->file, file_flags, addr, addr_len, flags);
2077         if (ret == -EAGAIN && force_nonblock) {
2078                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2079                 return -EAGAIN;
2080         }
2081         if (ret == -ERESTARTSYS)
2082                 ret = -EINTR;
2083         if (ret < 0 && (req->flags & REQ_F_LINK))
2084                 req->flags |= REQ_F_FAIL_LINK;
2085         io_cqring_add_event(req, ret);
2086         io_put_req_find_next(req, nxt);
2087         return 0;
2088 #else
2089         return -EOPNOTSUPP;
2090 #endif
2091 }
2092
2093 static int io_connect(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2094                       struct io_kiocb **nxt, bool force_nonblock)
2095 {
2096 #if defined(CONFIG_NET)
2097         struct sockaddr __user *addr;
2098         unsigned file_flags;
2099         int addr_len, ret;
2100
2101         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2102                 return -EINVAL;
2103         if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
2104                 return -EINVAL;
2105
2106         addr = (struct sockaddr __user *) (unsigned long) READ_ONCE(sqe->addr);
2107         addr_len = READ_ONCE(sqe->addr2);
2108         file_flags = force_nonblock ? O_NONBLOCK : 0;
2109
2110         ret = __sys_connect_file(req->file, addr, addr_len, file_flags);
2111         if (ret == -EAGAIN && force_nonblock)
2112                 return -EAGAIN;
2113         if (ret == -ERESTARTSYS)
2114                 ret = -EINTR;
2115         if (ret < 0 && (req->flags & REQ_F_LINK))
2116                 req->flags |= REQ_F_FAIL_LINK;
2117         io_cqring_add_event(req, ret);
2118         io_put_req_find_next(req, nxt);
2119         return 0;
2120 #else
2121         return -EOPNOTSUPP;
2122 #endif
2123 }
2124
2125 static inline void io_poll_remove_req(struct io_kiocb *req)
2126 {
2127         if (!RB_EMPTY_NODE(&req->rb_node)) {
2128                 rb_erase(&req->rb_node, &req->ctx->cancel_tree);
2129                 RB_CLEAR_NODE(&req->rb_node);
2130         }
2131 }
2132
2133 static void io_poll_remove_one(struct io_kiocb *req)
2134 {
2135         struct io_poll_iocb *poll = &req->poll;
2136
2137         spin_lock(&poll->head->lock);
2138         WRITE_ONCE(poll->canceled, true);
2139         if (!list_empty(&poll->wait->entry)) {
2140                 list_del_init(&poll->wait->entry);
2141                 io_queue_async_work(req);
2142         }
2143         spin_unlock(&poll->head->lock);
2144         io_poll_remove_req(req);
2145 }
2146
2147 static void io_poll_remove_all(struct io_ring_ctx *ctx)
2148 {
2149         struct rb_node *node;
2150         struct io_kiocb *req;
2151
2152         spin_lock_irq(&ctx->completion_lock);
2153         while ((node = rb_first(&ctx->cancel_tree)) != NULL) {
2154                 req = rb_entry(node, struct io_kiocb, rb_node);
2155                 io_poll_remove_one(req);
2156         }
2157         spin_unlock_irq(&ctx->completion_lock);
2158 }
2159
2160 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
2161 {
2162         struct rb_node *p, *parent = NULL;
2163         struct io_kiocb *req;
2164
2165         p = ctx->cancel_tree.rb_node;
2166         while (p) {
2167                 parent = p;
2168                 req = rb_entry(parent, struct io_kiocb, rb_node);
2169                 if (sqe_addr < req->user_data) {
2170                         p = p->rb_left;
2171                 } else if (sqe_addr > req->user_data) {
2172                         p = p->rb_right;
2173                 } else {
2174                         io_poll_remove_one(req);
2175                         return 0;
2176                 }
2177         }
2178
2179         return -ENOENT;
2180 }
2181
2182 /*
2183  * Find a running poll command that matches one specified in sqe->addr,
2184  * and remove it if found.
2185  */
2186 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2187 {
2188         struct io_ring_ctx *ctx = req->ctx;
2189         int ret;
2190
2191         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2192                 return -EINVAL;
2193         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
2194             sqe->poll_events)
2195                 return -EINVAL;
2196
2197         spin_lock_irq(&ctx->completion_lock);
2198         ret = io_poll_cancel(ctx, READ_ONCE(sqe->addr));
2199         spin_unlock_irq(&ctx->completion_lock);
2200
2201         io_cqring_add_event(req, ret);
2202         if (ret < 0 && (req->flags & REQ_F_LINK))
2203                 req->flags |= REQ_F_FAIL_LINK;
2204         io_put_req(req);
2205         return 0;
2206 }
2207
2208 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
2209 {
2210         struct io_ring_ctx *ctx = req->ctx;
2211
2212         req->poll.done = true;
2213         kfree(req->poll.wait);
2214         if (error)
2215                 io_cqring_fill_event(req, error);
2216         else
2217                 io_cqring_fill_event(req, mangle_poll(mask));
2218         io_commit_cqring(ctx);
2219 }
2220
2221 static void io_poll_complete_work(struct io_wq_work **workptr)
2222 {
2223         struct io_wq_work *work = *workptr;
2224         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2225         struct io_poll_iocb *poll = &req->poll;
2226         struct poll_table_struct pt = { ._key = poll->events };
2227         struct io_ring_ctx *ctx = req->ctx;
2228         struct io_kiocb *nxt = NULL;
2229         __poll_t mask = 0;
2230         int ret = 0;
2231
2232         if (work->flags & IO_WQ_WORK_CANCEL) {
2233                 WRITE_ONCE(poll->canceled, true);
2234                 ret = -ECANCELED;
2235         } else if (READ_ONCE(poll->canceled)) {
2236                 ret = -ECANCELED;
2237         }
2238
2239         if (ret != -ECANCELED)
2240                 mask = vfs_poll(poll->file, &pt) & poll->events;
2241
2242         /*
2243          * Note that ->ki_cancel callers also delete iocb from active_reqs after
2244          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
2245          * synchronize with them.  In the cancellation case the list_del_init
2246          * itself is not actually needed, but harmless so we keep it in to
2247          * avoid further branches in the fast path.
2248          */
2249         spin_lock_irq(&ctx->completion_lock);
2250         if (!mask && ret != -ECANCELED) {
2251                 add_wait_queue(poll->head, poll->wait);
2252                 spin_unlock_irq(&ctx->completion_lock);
2253                 return;
2254         }
2255         io_poll_remove_req(req);
2256         io_poll_complete(req, mask, ret);
2257         spin_unlock_irq(&ctx->completion_lock);
2258
2259         io_cqring_ev_posted(ctx);
2260
2261         if (ret < 0 && req->flags & REQ_F_LINK)
2262                 req->flags |= REQ_F_FAIL_LINK;
2263         io_put_req_find_next(req, &nxt);
2264         if (nxt)
2265                 *workptr = &nxt->work;
2266 }
2267
2268 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
2269                         void *key)
2270 {
2271         struct io_poll_iocb *poll = wait->private;
2272         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
2273         struct io_ring_ctx *ctx = req->ctx;
2274         __poll_t mask = key_to_poll(key);
2275         unsigned long flags;
2276
2277         /* for instances that support it check for an event match first: */
2278         if (mask && !(mask & poll->events))
2279                 return 0;
2280
2281         list_del_init(&poll->wait->entry);
2282
2283         /*
2284          * Run completion inline if we can. We're using trylock here because
2285          * we are violating the completion_lock -> poll wq lock ordering.
2286          * If we have a link timeout we're going to need the completion_lock
2287          * for finalizing the request, mark us as having grabbed that already.
2288          */
2289         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
2290                 io_poll_remove_req(req);
2291                 io_poll_complete(req, mask, 0);
2292                 req->flags |= REQ_F_COMP_LOCKED;
2293                 io_put_req(req);
2294                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
2295
2296                 io_cqring_ev_posted(ctx);
2297         } else {
2298                 io_queue_async_work(req);
2299         }
2300
2301         return 1;
2302 }
2303
2304 struct io_poll_table {
2305         struct poll_table_struct pt;
2306         struct io_kiocb *req;
2307         int error;
2308 };
2309
2310 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
2311                                struct poll_table_struct *p)
2312 {
2313         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
2314
2315         if (unlikely(pt->req->poll.head)) {
2316                 pt->error = -EINVAL;
2317                 return;
2318         }
2319
2320         pt->error = 0;
2321         pt->req->poll.head = head;
2322         add_wait_queue(head, pt->req->poll.wait);
2323 }
2324
2325 static void io_poll_req_insert(struct io_kiocb *req)
2326 {
2327         struct io_ring_ctx *ctx = req->ctx;
2328         struct rb_node **p = &ctx->cancel_tree.rb_node;
2329         struct rb_node *parent = NULL;
2330         struct io_kiocb *tmp;
2331
2332         while (*p) {
2333                 parent = *p;
2334                 tmp = rb_entry(parent, struct io_kiocb, rb_node);
2335                 if (req->user_data < tmp->user_data)
2336                         p = &(*p)->rb_left;
2337                 else
2338                         p = &(*p)->rb_right;
2339         }
2340         rb_link_node(&req->rb_node, parent, p);
2341         rb_insert_color(&req->rb_node, &ctx->cancel_tree);
2342 }
2343
2344 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2345                        struct io_kiocb **nxt)
2346 {
2347         struct io_poll_iocb *poll = &req->poll;
2348         struct io_ring_ctx *ctx = req->ctx;
2349         struct io_poll_table ipt;
2350         bool cancel = false;
2351         __poll_t mask;
2352         u16 events;
2353
2354         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2355                 return -EINVAL;
2356         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
2357                 return -EINVAL;
2358         if (!poll->file)
2359                 return -EBADF;
2360
2361         poll->wait = kmalloc(sizeof(*poll->wait), GFP_KERNEL);
2362         if (!poll->wait)
2363                 return -ENOMEM;
2364
2365         req->io = NULL;
2366         INIT_IO_WORK(&req->work, io_poll_complete_work);
2367         events = READ_ONCE(sqe->poll_events);
2368         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
2369         RB_CLEAR_NODE(&req->rb_node);
2370
2371         poll->head = NULL;
2372         poll->done = false;
2373         poll->canceled = false;
2374
2375         ipt.pt._qproc = io_poll_queue_proc;
2376         ipt.pt._key = poll->events;
2377         ipt.req = req;
2378         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
2379
2380         /* initialized the list so that we can do list_empty checks */
2381         INIT_LIST_HEAD(&poll->wait->entry);
2382         init_waitqueue_func_entry(poll->wait, io_poll_wake);
2383         poll->wait->private = poll;
2384
2385         INIT_LIST_HEAD(&req->list);
2386
2387         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
2388
2389         spin_lock_irq(&ctx->completion_lock);
2390         if (likely(poll->head)) {
2391                 spin_lock(&poll->head->lock);
2392                 if (unlikely(list_empty(&poll->wait->entry))) {
2393                         if (ipt.error)
2394                                 cancel = true;
2395                         ipt.error = 0;
2396                         mask = 0;
2397                 }
2398                 if (mask || ipt.error)
2399                         list_del_init(&poll->wait->entry);
2400                 else if (cancel)
2401                         WRITE_ONCE(poll->canceled, true);
2402                 else if (!poll->done) /* actually waiting for an event */
2403                         io_poll_req_insert(req);
2404                 spin_unlock(&poll->head->lock);
2405         }
2406         if (mask) { /* no async, we'd stolen it */
2407                 ipt.error = 0;
2408                 io_poll_complete(req, mask, 0);
2409         }
2410         spin_unlock_irq(&ctx->completion_lock);
2411
2412         if (mask) {
2413                 io_cqring_ev_posted(ctx);
2414                 io_put_req_find_next(req, nxt);
2415         }
2416         return ipt.error;
2417 }
2418
2419 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
2420 {
2421         struct io_timeout_data *data = container_of(timer,
2422                                                 struct io_timeout_data, timer);
2423         struct io_kiocb *req = data->req;
2424         struct io_ring_ctx *ctx = req->ctx;
2425         unsigned long flags;
2426
2427         atomic_inc(&ctx->cq_timeouts);
2428
2429         spin_lock_irqsave(&ctx->completion_lock, flags);
2430         /*
2431          * We could be racing with timeout deletion. If the list is empty,
2432          * then timeout lookup already found it and will be handling it.
2433          */
2434         if (!list_empty(&req->list)) {
2435                 struct io_kiocb *prev;
2436
2437                 /*
2438                  * Adjust the reqs sequence before the current one because it
2439                  * will consume a slot in the cq_ring and the the cq_tail
2440                  * pointer will be increased, otherwise other timeout reqs may
2441                  * return in advance without waiting for enough wait_nr.
2442                  */
2443                 prev = req;
2444                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
2445                         prev->sequence++;
2446                 list_del_init(&req->list);
2447         }
2448
2449         io_cqring_fill_event(req, -ETIME);
2450         io_commit_cqring(ctx);
2451         spin_unlock_irqrestore(&ctx->completion_lock, flags);
2452
2453         io_cqring_ev_posted(ctx);
2454         if (req->flags & REQ_F_LINK)
2455                 req->flags |= REQ_F_FAIL_LINK;
2456         io_put_req(req);
2457         return HRTIMER_NORESTART;
2458 }
2459
2460 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
2461 {
2462         struct io_kiocb *req;
2463         int ret = -ENOENT;
2464
2465         list_for_each_entry(req, &ctx->timeout_list, list) {
2466                 if (user_data == req->user_data) {
2467                         list_del_init(&req->list);
2468                         ret = 0;
2469                         break;
2470                 }
2471         }
2472
2473         if (ret == -ENOENT)
2474                 return ret;
2475
2476         ret = hrtimer_try_to_cancel(&req->timeout.data->timer);
2477         if (ret == -1)
2478                 return -EALREADY;
2479
2480         if (req->flags & REQ_F_LINK)
2481                 req->flags |= REQ_F_FAIL_LINK;
2482         io_cqring_fill_event(req, -ECANCELED);
2483         io_put_req(req);
2484         return 0;
2485 }
2486
2487 /*
2488  * Remove or update an existing timeout command
2489  */
2490 static int io_timeout_remove(struct io_kiocb *req,
2491                              const struct io_uring_sqe *sqe)
2492 {
2493         struct io_ring_ctx *ctx = req->ctx;
2494         unsigned flags;
2495         int ret;
2496
2497         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2498                 return -EINVAL;
2499         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
2500                 return -EINVAL;
2501         flags = READ_ONCE(sqe->timeout_flags);
2502         if (flags)
2503                 return -EINVAL;
2504
2505         spin_lock_irq(&ctx->completion_lock);
2506         ret = io_timeout_cancel(ctx, READ_ONCE(sqe->addr));
2507
2508         io_cqring_fill_event(req, ret);
2509         io_commit_cqring(ctx);
2510         spin_unlock_irq(&ctx->completion_lock);
2511         io_cqring_ev_posted(ctx);
2512         if (ret < 0 && req->flags & REQ_F_LINK)
2513                 req->flags |= REQ_F_FAIL_LINK;
2514         io_put_req(req);
2515         return 0;
2516 }
2517
2518 static int io_timeout_setup(struct io_kiocb *req)
2519 {
2520         const struct io_uring_sqe *sqe = req->sqe;
2521         struct io_timeout_data *data;
2522         unsigned flags;
2523
2524         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2525                 return -EINVAL;
2526         if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
2527                 return -EINVAL;
2528         flags = READ_ONCE(sqe->timeout_flags);
2529         if (flags & ~IORING_TIMEOUT_ABS)
2530                 return -EINVAL;
2531
2532         data = kzalloc(sizeof(struct io_timeout_data), GFP_KERNEL);
2533         if (!data)
2534                 return -ENOMEM;
2535         data->req = req;
2536         req->timeout.data = data;
2537         req->flags |= REQ_F_TIMEOUT;
2538
2539         if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
2540                 return -EFAULT;
2541
2542         if (flags & IORING_TIMEOUT_ABS)
2543                 data->mode = HRTIMER_MODE_ABS;
2544         else
2545                 data->mode = HRTIMER_MODE_REL;
2546
2547         hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
2548         return 0;
2549 }
2550
2551 static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2552 {
2553         unsigned count;
2554         struct io_ring_ctx *ctx = req->ctx;
2555         struct io_timeout_data *data;
2556         struct list_head *entry;
2557         unsigned span = 0;
2558         int ret;
2559
2560         ret = io_timeout_setup(req);
2561         /* common setup allows flags (like links) set, we don't */
2562         if (!ret && sqe->flags)
2563                 ret = -EINVAL;
2564         if (ret)
2565                 return ret;
2566
2567         /*
2568          * sqe->off holds how many events that need to occur for this
2569          * timeout event to be satisfied. If it isn't set, then this is
2570          * a pure timeout request, sequence isn't used.
2571          */
2572         count = READ_ONCE(sqe->off);
2573         if (!count) {
2574                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
2575                 spin_lock_irq(&ctx->completion_lock);
2576                 entry = ctx->timeout_list.prev;
2577                 goto add;
2578         }
2579
2580         req->sequence = ctx->cached_sq_head + count - 1;
2581         req->timeout.data->seq_offset = count;
2582
2583         /*
2584          * Insertion sort, ensuring the first entry in the list is always
2585          * the one we need first.
2586          */
2587         spin_lock_irq(&ctx->completion_lock);
2588         list_for_each_prev(entry, &ctx->timeout_list) {
2589                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
2590                 unsigned nxt_sq_head;
2591                 long long tmp, tmp_nxt;
2592                 u32 nxt_offset = nxt->timeout.data->seq_offset;
2593
2594                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
2595                         continue;
2596
2597                 /*
2598                  * Since cached_sq_head + count - 1 can overflow, use type long
2599                  * long to store it.
2600                  */
2601                 tmp = (long long)ctx->cached_sq_head + count - 1;
2602                 nxt_sq_head = nxt->sequence - nxt_offset + 1;
2603                 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
2604
2605                 /*
2606                  * cached_sq_head may overflow, and it will never overflow twice
2607                  * once there is some timeout req still be valid.
2608                  */
2609                 if (ctx->cached_sq_head < nxt_sq_head)
2610                         tmp += UINT_MAX;
2611
2612                 if (tmp > tmp_nxt)
2613                         break;
2614
2615                 /*
2616                  * Sequence of reqs after the insert one and itself should
2617                  * be adjusted because each timeout req consumes a slot.
2618                  */
2619                 span++;
2620                 nxt->sequence++;
2621         }
2622         req->sequence -= span;
2623 add:
2624         list_add(&req->list, entry);
2625         data = req->timeout.data;
2626         data->timer.function = io_timeout_fn;
2627         hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
2628         spin_unlock_irq(&ctx->completion_lock);
2629         return 0;
2630 }
2631
2632 static bool io_cancel_cb(struct io_wq_work *work, void *data)
2633 {
2634         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2635
2636         return req->user_data == (unsigned long) data;
2637 }
2638
2639 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
2640 {
2641         enum io_wq_cancel cancel_ret;
2642         int ret = 0;
2643
2644         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
2645         switch (cancel_ret) {
2646         case IO_WQ_CANCEL_OK:
2647                 ret = 0;
2648                 break;
2649         case IO_WQ_CANCEL_RUNNING:
2650                 ret = -EALREADY;
2651                 break;
2652         case IO_WQ_CANCEL_NOTFOUND:
2653                 ret = -ENOENT;
2654                 break;
2655         }
2656
2657         return ret;
2658 }
2659
2660 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
2661                                      struct io_kiocb *req, __u64 sqe_addr,
2662                                      struct io_kiocb **nxt, int success_ret)
2663 {
2664         unsigned long flags;
2665         int ret;
2666
2667         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
2668         if (ret != -ENOENT) {
2669                 spin_lock_irqsave(&ctx->completion_lock, flags);
2670                 goto done;
2671         }
2672
2673         spin_lock_irqsave(&ctx->completion_lock, flags);
2674         ret = io_timeout_cancel(ctx, sqe_addr);
2675         if (ret != -ENOENT)
2676                 goto done;
2677         ret = io_poll_cancel(ctx, sqe_addr);
2678 done:
2679         if (!ret)
2680                 ret = success_ret;
2681         io_cqring_fill_event(req, ret);
2682         io_commit_cqring(ctx);
2683         spin_unlock_irqrestore(&ctx->completion_lock, flags);
2684         io_cqring_ev_posted(ctx);
2685
2686         if (ret < 0 && (req->flags & REQ_F_LINK))
2687                 req->flags |= REQ_F_FAIL_LINK;
2688         io_put_req_find_next(req, nxt);
2689 }
2690
2691 static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2692                            struct io_kiocb **nxt)
2693 {
2694         struct io_ring_ctx *ctx = req->ctx;
2695
2696         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2697                 return -EINVAL;
2698         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
2699             sqe->cancel_flags)
2700                 return -EINVAL;
2701
2702         io_async_find_and_cancel(ctx, req, READ_ONCE(sqe->addr), nxt, 0);
2703         return 0;
2704 }
2705
2706 static int io_req_defer_prep(struct io_kiocb *req, struct io_async_ctx *io)
2707 {
2708         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2709         struct iov_iter iter;
2710         ssize_t ret;
2711
2712         memcpy(&io->sqe, req->sqe, sizeof(io->sqe));
2713         req->sqe = &io->sqe;
2714
2715         switch (io->sqe.opcode) {
2716         case IORING_OP_READV:
2717         case IORING_OP_READ_FIXED:
2718                 ret = io_read_prep(req, &iovec, &iter, true);
2719                 break;
2720         case IORING_OP_WRITEV:
2721         case IORING_OP_WRITE_FIXED:
2722                 ret = io_write_prep(req, &iovec, &iter, true);
2723                 break;
2724         default:
2725                 req->io = io;
2726                 return 0;
2727         }
2728
2729         if (ret < 0)
2730                 return ret;
2731
2732         req->io = io;
2733         io_req_map_io(req, ret, iovec, inline_vecs, &iter);
2734         return 0;
2735 }
2736
2737 static int io_req_defer(struct io_kiocb *req)
2738 {
2739         struct io_ring_ctx *ctx = req->ctx;
2740         struct io_async_ctx *io;
2741         int ret;
2742
2743         /* Still need defer if there is pending req in defer list. */
2744         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
2745                 return 0;
2746
2747         io = kmalloc(sizeof(*io), GFP_KERNEL);
2748         if (!io)
2749                 return -EAGAIN;
2750
2751         spin_lock_irq(&ctx->completion_lock);
2752         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
2753                 spin_unlock_irq(&ctx->completion_lock);
2754                 kfree(io);
2755                 return 0;
2756         }
2757
2758         ret = io_req_defer_prep(req, io);
2759         if (ret < 0)
2760                 return ret;
2761
2762         trace_io_uring_defer(ctx, req, req->user_data);
2763         list_add_tail(&req->list, &ctx->defer_list);
2764         spin_unlock_irq(&ctx->completion_lock);
2765         return -EIOCBQUEUED;
2766 }
2767
2768 __attribute__((nonnull))
2769 static int io_issue_sqe(struct io_kiocb *req, struct io_kiocb **nxt,
2770                         bool force_nonblock)
2771 {
2772         int ret, opcode;
2773         struct io_ring_ctx *ctx = req->ctx;
2774
2775         opcode = READ_ONCE(req->sqe->opcode);
2776         switch (opcode) {
2777         case IORING_OP_NOP:
2778                 ret = io_nop(req);
2779                 break;
2780         case IORING_OP_READV:
2781                 if (unlikely(req->sqe->buf_index))
2782                         return -EINVAL;
2783                 ret = io_read(req, nxt, force_nonblock);
2784                 break;
2785         case IORING_OP_WRITEV:
2786                 if (unlikely(req->sqe->buf_index))
2787                         return -EINVAL;
2788                 ret = io_write(req, nxt, force_nonblock);
2789                 break;
2790         case IORING_OP_READ_FIXED:
2791                 ret = io_read(req, nxt, force_nonblock);
2792                 break;
2793         case IORING_OP_WRITE_FIXED:
2794                 ret = io_write(req, nxt, force_nonblock);
2795                 break;
2796         case IORING_OP_FSYNC:
2797                 ret = io_fsync(req, req->sqe, nxt, force_nonblock);
2798                 break;
2799         case IORING_OP_POLL_ADD:
2800                 ret = io_poll_add(req, req->sqe, nxt);
2801                 break;
2802         case IORING_OP_POLL_REMOVE:
2803                 ret = io_poll_remove(req, req->sqe);
2804                 break;
2805         case IORING_OP_SYNC_FILE_RANGE:
2806                 ret = io_sync_file_range(req, req->sqe, nxt, force_nonblock);
2807                 break;
2808         case IORING_OP_SENDMSG:
2809                 ret = io_sendmsg(req, req->sqe, nxt, force_nonblock);
2810                 break;
2811         case IORING_OP_RECVMSG:
2812                 ret = io_recvmsg(req, req->sqe, nxt, force_nonblock);
2813                 break;
2814         case IORING_OP_TIMEOUT:
2815                 ret = io_timeout(req, req->sqe);
2816                 break;
2817         case IORING_OP_TIMEOUT_REMOVE:
2818                 ret = io_timeout_remove(req, req->sqe);
2819                 break;
2820         case IORING_OP_ACCEPT:
2821                 ret = io_accept(req, req->sqe, nxt, force_nonblock);
2822                 break;
2823         case IORING_OP_CONNECT:
2824                 ret = io_connect(req, req->sqe, nxt, force_nonblock);
2825                 break;
2826         case IORING_OP_ASYNC_CANCEL:
2827                 ret = io_async_cancel(req, req->sqe, nxt);
2828                 break;
2829         default:
2830                 ret = -EINVAL;
2831                 break;
2832         }
2833
2834         if (ret)
2835                 return ret;
2836
2837         if (ctx->flags & IORING_SETUP_IOPOLL) {
2838                 if (req->result == -EAGAIN)
2839                         return -EAGAIN;
2840
2841                 /* workqueue context doesn't hold uring_lock, grab it now */
2842                 if (req->in_async)
2843                         mutex_lock(&ctx->uring_lock);
2844                 io_iopoll_req_issued(req);
2845                 if (req->in_async)
2846                         mutex_unlock(&ctx->uring_lock);
2847         }
2848
2849         return 0;
2850 }
2851
2852 static void io_link_work_cb(struct io_wq_work **workptr)
2853 {
2854         struct io_wq_work *work = *workptr;
2855         struct io_kiocb *link = work->data;
2856
2857         io_queue_linked_timeout(link);
2858         work->func = io_wq_submit_work;
2859 }
2860
2861 static void io_wq_submit_work(struct io_wq_work **workptr)
2862 {
2863         struct io_wq_work *work = *workptr;
2864         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2865         struct io_kiocb *nxt = NULL;
2866         int ret = 0;
2867
2868         /* Ensure we clear previously set non-block flag */
2869         req->rw.ki_flags &= ~IOCB_NOWAIT;
2870
2871         if (work->flags & IO_WQ_WORK_CANCEL)
2872                 ret = -ECANCELED;
2873
2874         if (!ret) {
2875                 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
2876                 req->in_async = true;
2877                 do {
2878                         ret = io_issue_sqe(req, &nxt, false);
2879                         /*
2880                          * We can get EAGAIN for polled IO even though we're
2881                          * forcing a sync submission from here, since we can't
2882                          * wait for request slots on the block side.
2883                          */
2884                         if (ret != -EAGAIN)
2885                                 break;
2886                         cond_resched();
2887                 } while (1);
2888         }
2889
2890         /* drop submission reference */
2891         io_put_req(req);
2892
2893         if (ret) {
2894                 if (req->flags & REQ_F_LINK)
2895                         req->flags |= REQ_F_FAIL_LINK;
2896                 io_cqring_add_event(req, ret);
2897                 io_put_req(req);
2898         }
2899
2900         /* if a dependent link is ready, pass it back */
2901         if (!ret && nxt) {
2902                 struct io_kiocb *link;
2903
2904                 io_prep_async_work(nxt, &link);
2905                 *workptr = &nxt->work;
2906                 if (link) {
2907                         nxt->work.flags |= IO_WQ_WORK_CB;
2908                         nxt->work.func = io_link_work_cb;
2909                         nxt->work.data = link;
2910                 }
2911         }
2912 }
2913
2914 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
2915 {
2916         int op = READ_ONCE(sqe->opcode);
2917
2918         switch (op) {
2919         case IORING_OP_NOP:
2920         case IORING_OP_POLL_REMOVE:
2921         case IORING_OP_TIMEOUT:
2922         case IORING_OP_TIMEOUT_REMOVE:
2923         case IORING_OP_ASYNC_CANCEL:
2924         case IORING_OP_LINK_TIMEOUT:
2925                 return false;
2926         default:
2927                 return true;
2928         }
2929 }
2930
2931 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
2932                                               int index)
2933 {
2934         struct fixed_file_table *table;
2935
2936         table = &ctx->file_table[index >> IORING_FILE_TABLE_SHIFT];
2937         return table->files[index & IORING_FILE_TABLE_MASK];
2938 }
2939
2940 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req)
2941 {
2942         struct io_ring_ctx *ctx = req->ctx;
2943         unsigned flags;
2944         int fd;
2945
2946         flags = READ_ONCE(req->sqe->flags);
2947         fd = READ_ONCE(req->sqe->fd);
2948
2949         if (flags & IOSQE_IO_DRAIN)
2950                 req->flags |= REQ_F_IO_DRAIN;
2951
2952         if (!io_op_needs_file(req->sqe))
2953                 return 0;
2954
2955         if (flags & IOSQE_FIXED_FILE) {
2956                 if (unlikely(!ctx->file_table ||
2957                     (unsigned) fd >= ctx->nr_user_files))
2958                         return -EBADF;
2959                 fd = array_index_nospec(fd, ctx->nr_user_files);
2960                 req->file = io_file_from_index(ctx, fd);
2961                 if (!req->file)
2962                         return -EBADF;
2963                 req->flags |= REQ_F_FIXED_FILE;
2964         } else {
2965                 if (req->needs_fixed_file)
2966                         return -EBADF;
2967                 trace_io_uring_file_get(ctx, fd);
2968                 req->file = io_file_get(state, fd);
2969                 if (unlikely(!req->file))
2970                         return -EBADF;
2971         }
2972
2973         return 0;
2974 }
2975
2976 static int io_grab_files(struct io_kiocb *req)
2977 {
2978         int ret = -EBADF;
2979         struct io_ring_ctx *ctx = req->ctx;
2980
2981         rcu_read_lock();
2982         spin_lock_irq(&ctx->inflight_lock);
2983         /*
2984          * We use the f_ops->flush() handler to ensure that we can flush
2985          * out work accessing these files if the fd is closed. Check if
2986          * the fd has changed since we started down this path, and disallow
2987          * this operation if it has.
2988          */
2989         if (fcheck(req->ring_fd) == req->ring_file) {
2990                 list_add(&req->inflight_entry, &ctx->inflight_list);
2991                 req->flags |= REQ_F_INFLIGHT;
2992                 req->work.files = current->files;
2993                 ret = 0;
2994         }
2995         spin_unlock_irq(&ctx->inflight_lock);
2996         rcu_read_unlock();
2997
2998         return ret;
2999 }
3000
3001 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
3002 {
3003         struct io_timeout_data *data = container_of(timer,
3004                                                 struct io_timeout_data, timer);
3005         struct io_kiocb *req = data->req;
3006         struct io_ring_ctx *ctx = req->ctx;
3007         struct io_kiocb *prev = NULL;
3008         unsigned long flags;
3009
3010         spin_lock_irqsave(&ctx->completion_lock, flags);
3011
3012         /*
3013          * We don't expect the list to be empty, that will only happen if we
3014          * race with the completion of the linked work.
3015          */
3016         if (!list_empty(&req->list)) {
3017                 prev = list_entry(req->list.prev, struct io_kiocb, link_list);
3018                 if (refcount_inc_not_zero(&prev->refs)) {
3019                         list_del_init(&req->list);
3020                         prev->flags &= ~REQ_F_LINK_TIMEOUT;
3021                 } else
3022                         prev = NULL;
3023         }
3024
3025         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3026
3027         if (prev) {
3028                 if (prev->flags & REQ_F_LINK)
3029                         prev->flags |= REQ_F_FAIL_LINK;
3030                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
3031                                                 -ETIME);
3032                 io_put_req(prev);
3033         } else {
3034                 io_cqring_add_event(req, -ETIME);
3035                 io_put_req(req);
3036         }
3037         return HRTIMER_NORESTART;
3038 }
3039
3040 static void io_queue_linked_timeout(struct io_kiocb *req)
3041 {
3042         struct io_ring_ctx *ctx = req->ctx;
3043
3044         /*
3045          * If the list is now empty, then our linked request finished before
3046          * we got a chance to setup the timer
3047          */
3048         spin_lock_irq(&ctx->completion_lock);
3049         if (!list_empty(&req->list)) {
3050                 struct io_timeout_data *data = req->timeout.data;
3051
3052                 data->timer.function = io_link_timeout_fn;
3053                 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
3054                                 data->mode);
3055         }
3056         spin_unlock_irq(&ctx->completion_lock);
3057
3058         /* drop submission reference */
3059         io_put_req(req);
3060 }
3061
3062 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
3063 {
3064         struct io_kiocb *nxt;
3065
3066         if (!(req->flags & REQ_F_LINK))
3067                 return NULL;
3068
3069         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
3070         if (!nxt || nxt->sqe->opcode != IORING_OP_LINK_TIMEOUT)
3071                 return NULL;
3072
3073         req->flags |= REQ_F_LINK_TIMEOUT;
3074         return nxt;
3075 }
3076
3077 static void __io_queue_sqe(struct io_kiocb *req)
3078 {
3079         struct io_kiocb *linked_timeout = io_prep_linked_timeout(req);
3080         struct io_kiocb *nxt = NULL;
3081         int ret;
3082
3083         ret = io_issue_sqe(req, &nxt, true);
3084         if (nxt)
3085                 io_queue_async_work(nxt);
3086
3087         /*
3088          * We async punt it if the file wasn't marked NOWAIT, or if the file
3089          * doesn't support non-blocking read/write attempts
3090          */
3091         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
3092             (req->flags & REQ_F_MUST_PUNT))) {
3093                 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
3094                         ret = io_grab_files(req);
3095                         if (ret)
3096                                 goto err;
3097                 }
3098
3099                 /*
3100                  * Queued up for async execution, worker will release
3101                  * submit reference when the iocb is actually submitted.
3102                  */
3103                 io_queue_async_work(req);
3104                 return;
3105         }
3106
3107 err:
3108         /* drop submission reference */
3109         io_put_req(req);
3110
3111         if (linked_timeout) {
3112                 if (!ret)
3113                         io_queue_linked_timeout(linked_timeout);
3114                 else
3115                         io_put_req(linked_timeout);
3116         }
3117
3118         /* and drop final reference, if we failed */
3119         if (ret) {
3120                 io_cqring_add_event(req, ret);
3121                 if (req->flags & REQ_F_LINK)
3122                         req->flags |= REQ_F_FAIL_LINK;
3123                 io_put_req(req);
3124         }
3125 }
3126
3127 static void io_queue_sqe(struct io_kiocb *req)
3128 {
3129         int ret;
3130
3131         if (unlikely(req->ctx->drain_next)) {
3132                 req->flags |= REQ_F_IO_DRAIN;
3133                 req->ctx->drain_next = false;
3134         }
3135         req->ctx->drain_next = (req->flags & REQ_F_DRAIN_LINK);
3136
3137         ret = io_req_defer(req);
3138         if (ret) {
3139                 if (ret != -EIOCBQUEUED) {
3140                         io_cqring_add_event(req, ret);
3141                         if (req->flags & REQ_F_LINK)
3142                                 req->flags |= REQ_F_FAIL_LINK;
3143                         io_double_put_req(req);
3144                 }
3145         } else
3146                 __io_queue_sqe(req);
3147 }
3148
3149 static inline void io_queue_link_head(struct io_kiocb *req)
3150 {
3151         if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
3152                 io_cqring_add_event(req, -ECANCELED);
3153                 io_double_put_req(req);
3154         } else
3155                 io_queue_sqe(req);
3156 }
3157
3158
3159 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
3160
3161 static void io_submit_sqe(struct io_kiocb *req, struct io_submit_state *state,
3162                           struct io_kiocb **link)
3163 {
3164         struct io_ring_ctx *ctx = req->ctx;
3165         int ret;
3166
3167         req->user_data = req->sqe->user_data;
3168
3169         /* enforce forwards compatibility on users */
3170         if (unlikely(req->sqe->flags & ~SQE_VALID_FLAGS)) {
3171                 ret = -EINVAL;
3172                 goto err_req;
3173         }
3174
3175         ret = io_req_set_file(state, req);
3176         if (unlikely(ret)) {
3177 err_req:
3178                 io_cqring_add_event(req, ret);
3179                 io_double_put_req(req);
3180                 return;
3181         }
3182
3183         /*
3184          * If we already have a head request, queue this one for async
3185          * submittal once the head completes. If we don't have a head but
3186          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
3187          * submitted sync once the chain is complete. If none of those
3188          * conditions are true (normal request), then just queue it.
3189          */
3190         if (*link) {
3191                 struct io_kiocb *prev = *link;
3192                 struct io_async_ctx *io;
3193
3194                 if (req->sqe->flags & IOSQE_IO_DRAIN)
3195                         (*link)->flags |= REQ_F_DRAIN_LINK | REQ_F_IO_DRAIN;
3196
3197                 if (READ_ONCE(req->sqe->opcode) == IORING_OP_LINK_TIMEOUT) {
3198                         ret = io_timeout_setup(req);
3199                         /* common setup allows offset being set, we don't */
3200                         if (!ret && req->sqe->off)
3201                                 ret = -EINVAL;
3202                         if (ret) {
3203                                 prev->flags |= REQ_F_FAIL_LINK;
3204                                 goto err_req;
3205                         }
3206                 }
3207
3208                 io = kmalloc(sizeof(*io), GFP_KERNEL);
3209                 if (!io) {
3210                         ret = -EAGAIN;
3211                         goto err_req;
3212                 }
3213
3214                 ret = io_req_defer_prep(req, io);
3215                 if (ret)
3216                         goto err_req;
3217                 trace_io_uring_link(ctx, req, prev);
3218                 list_add_tail(&req->list, &prev->link_list);
3219         } else if (req->sqe->flags & IOSQE_IO_LINK) {
3220                 req->flags |= REQ_F_LINK;
3221
3222                 INIT_LIST_HEAD(&req->link_list);
3223                 *link = req;
3224         } else {
3225                 io_queue_sqe(req);
3226         }
3227 }
3228
3229 /*
3230  * Batched submission is done, ensure local IO is flushed out.
3231  */
3232 static void io_submit_state_end(struct io_submit_state *state)
3233 {
3234         blk_finish_plug(&state->plug);
3235         io_file_put(state);
3236         if (state->free_reqs)
3237                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
3238                                         &state->reqs[state->cur_req]);
3239 }
3240
3241 /*
3242  * Start submission side cache.
3243  */
3244 static void io_submit_state_start(struct io_submit_state *state,
3245                                   struct io_ring_ctx *ctx, unsigned max_ios)
3246 {
3247         blk_start_plug(&state->plug);
3248         state->free_reqs = 0;
3249         state->file = NULL;
3250         state->ios_left = max_ios;
3251 }
3252
3253 static void io_commit_sqring(struct io_ring_ctx *ctx)
3254 {
3255         struct io_rings *rings = ctx->rings;
3256
3257         if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
3258                 /*
3259                  * Ensure any loads from the SQEs are done at this point,
3260                  * since once we write the new head, the application could
3261                  * write new data to them.
3262                  */
3263                 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
3264         }
3265 }
3266
3267 /*
3268  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
3269  * that is mapped by userspace. This means that care needs to be taken to
3270  * ensure that reads are stable, as we cannot rely on userspace always
3271  * being a good citizen. If members of the sqe are validated and then later
3272  * used, it's important that those reads are done through READ_ONCE() to
3273  * prevent a re-load down the line.
3274  */
3275 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req)
3276 {
3277         struct io_rings *rings = ctx->rings;
3278         u32 *sq_array = ctx->sq_array;
3279         unsigned head;
3280
3281         /*
3282          * The cached sq head (or cq tail) serves two purposes:
3283          *
3284          * 1) allows us to batch the cost of updating the user visible
3285          *    head updates.
3286          * 2) allows the kernel side to track the head on its own, even
3287          *    though the application is the one updating it.
3288          */
3289         head = ctx->cached_sq_head;
3290         /* make sure SQ entry isn't read before tail */
3291         if (unlikely(head == smp_load_acquire(&rings->sq.tail)))
3292                 return false;
3293
3294         head = READ_ONCE(sq_array[head & ctx->sq_mask]);
3295         if (likely(head < ctx->sq_entries)) {
3296                 /*
3297                  * All io need record the previous position, if LINK vs DARIN,
3298                  * it can be used to mark the position of the first IO in the
3299                  * link list.
3300                  */
3301                 req->sequence = ctx->cached_sq_head;
3302                 req->sqe = &ctx->sq_sqes[head];
3303                 ctx->cached_sq_head++;
3304                 return true;
3305         }
3306
3307         /* drop invalid entries */
3308         ctx->cached_sq_head++;
3309         ctx->cached_sq_dropped++;
3310         WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
3311         return false;
3312 }
3313
3314 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
3315                           struct file *ring_file, int ring_fd,
3316                           struct mm_struct **mm, bool async)
3317 {
3318         struct io_submit_state state, *statep = NULL;
3319         struct io_kiocb *link = NULL;
3320         int i, submitted = 0;
3321         bool mm_fault = false;
3322
3323         /* if we have a backlog and couldn't flush it all, return BUSY */
3324         if (!list_empty(&ctx->cq_overflow_list) &&
3325             !io_cqring_overflow_flush(ctx, false))
3326                 return -EBUSY;
3327
3328         if (nr > IO_PLUG_THRESHOLD) {
3329                 io_submit_state_start(&state, ctx, nr);
3330                 statep = &state;
3331         }
3332
3333         for (i = 0; i < nr; i++) {
3334                 struct io_kiocb *req;
3335                 unsigned int sqe_flags;
3336
3337                 req = io_get_req(ctx, statep);
3338                 if (unlikely(!req)) {
3339                         if (!submitted)
3340                                 submitted = -EAGAIN;
3341                         break;
3342                 }
3343                 if (!io_get_sqring(ctx, req)) {
3344                         __io_free_req(req);
3345                         break;
3346                 }
3347
3348                 if (io_sqe_needs_user(req->sqe) && !*mm) {
3349                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
3350                         if (!mm_fault) {
3351                                 use_mm(ctx->sqo_mm);
3352                                 *mm = ctx->sqo_mm;
3353                         }
3354                 }
3355
3356                 sqe_flags = req->sqe->flags;
3357
3358                 req->ring_file = ring_file;
3359                 req->ring_fd = ring_fd;
3360                 req->has_user = *mm != NULL;
3361                 req->in_async = async;
3362                 req->needs_fixed_file = async;
3363                 trace_io_uring_submit_sqe(ctx, req->sqe->user_data,
3364                                           true, async);
3365                 io_submit_sqe(req, statep, &link);
3366                 submitted++;
3367
3368                 /*
3369                  * If previous wasn't linked and we have a linked command,
3370                  * that's the end of the chain. Submit the previous link.
3371                  */
3372                 if (!(sqe_flags & IOSQE_IO_LINK) && link) {
3373                         io_queue_link_head(link);
3374                         link = NULL;
3375                 }
3376         }
3377
3378         if (link)
3379                 io_queue_link_head(link);
3380         if (statep)
3381                 io_submit_state_end(&state);
3382
3383          /* Commit SQ ring head once we've consumed and submitted all SQEs */
3384         io_commit_sqring(ctx);
3385
3386         return submitted;
3387 }
3388
3389 static int io_sq_thread(void *data)
3390 {
3391         struct io_ring_ctx *ctx = data;
3392         struct mm_struct *cur_mm = NULL;
3393         const struct cred *old_cred;
3394         mm_segment_t old_fs;
3395         DEFINE_WAIT(wait);
3396         unsigned inflight;
3397         unsigned long timeout;
3398         int ret;
3399
3400         complete(&ctx->completions[1]);
3401
3402         old_fs = get_fs();
3403         set_fs(USER_DS);
3404         old_cred = override_creds(ctx->creds);
3405
3406         ret = timeout = inflight = 0;
3407         while (!kthread_should_park()) {
3408                 unsigned int to_submit;
3409
3410                 if (inflight) {
3411                         unsigned nr_events = 0;
3412
3413                         if (ctx->flags & IORING_SETUP_IOPOLL) {
3414                                 /*
3415                                  * inflight is the count of the maximum possible
3416                                  * entries we submitted, but it can be smaller
3417                                  * if we dropped some of them. If we don't have
3418                                  * poll entries available, then we know that we
3419                                  * have nothing left to poll for. Reset the
3420                                  * inflight count to zero in that case.
3421                                  */
3422                                 mutex_lock(&ctx->uring_lock);
3423                                 if (!list_empty(&ctx->poll_list))
3424                                         __io_iopoll_check(ctx, &nr_events, 0);
3425                                 else
3426                                         inflight = 0;
3427                                 mutex_unlock(&ctx->uring_lock);
3428                         } else {
3429                                 /*
3430                                  * Normal IO, just pretend everything completed.
3431                                  * We don't have to poll completions for that.
3432                                  */
3433                                 nr_events = inflight;
3434                         }
3435
3436                         inflight -= nr_events;
3437                         if (!inflight)
3438                                 timeout = jiffies + ctx->sq_thread_idle;
3439                 }
3440
3441                 to_submit = io_sqring_entries(ctx);
3442
3443                 /*
3444                  * If submit got -EBUSY, flag us as needing the application
3445                  * to enter the kernel to reap and flush events.
3446                  */
3447                 if (!to_submit || ret == -EBUSY) {
3448                         /*
3449                          * We're polling. If we're within the defined idle
3450                          * period, then let us spin without work before going
3451                          * to sleep. The exception is if we got EBUSY doing
3452                          * more IO, we should wait for the application to
3453                          * reap events and wake us up.
3454                          */
3455                         if (inflight ||
3456                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
3457                                 cond_resched();
3458                                 continue;
3459                         }
3460
3461                         /*
3462                          * Drop cur_mm before scheduling, we can't hold it for
3463                          * long periods (or over schedule()). Do this before
3464                          * adding ourselves to the waitqueue, as the unuse/drop
3465                          * may sleep.
3466                          */
3467                         if (cur_mm) {
3468                                 unuse_mm(cur_mm);
3469                                 mmput(cur_mm);
3470                                 cur_mm = NULL;
3471                         }
3472
3473                         prepare_to_wait(&ctx->sqo_wait, &wait,
3474                                                 TASK_INTERRUPTIBLE);
3475
3476                         /* Tell userspace we may need a wakeup call */
3477                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
3478                         /* make sure to read SQ tail after writing flags */
3479                         smp_mb();
3480
3481                         to_submit = io_sqring_entries(ctx);
3482                         if (!to_submit || ret == -EBUSY) {
3483                                 if (kthread_should_park()) {
3484                                         finish_wait(&ctx->sqo_wait, &wait);
3485                                         break;
3486                                 }
3487                                 if (signal_pending(current))
3488                                         flush_signals(current);
3489                                 schedule();
3490                                 finish_wait(&ctx->sqo_wait, &wait);
3491
3492                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3493                                 continue;
3494                         }
3495                         finish_wait(&ctx->sqo_wait, &wait);
3496
3497                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3498                 }
3499
3500                 to_submit = min(to_submit, ctx->sq_entries);
3501                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
3502                 if (ret > 0)
3503                         inflight += ret;
3504         }
3505
3506         set_fs(old_fs);
3507         if (cur_mm) {
3508                 unuse_mm(cur_mm);
3509                 mmput(cur_mm);
3510         }
3511         revert_creds(old_cred);
3512
3513         kthread_parkme();
3514
3515         return 0;
3516 }
3517
3518 struct io_wait_queue {
3519         struct wait_queue_entry wq;
3520         struct io_ring_ctx *ctx;
3521         unsigned to_wait;
3522         unsigned nr_timeouts;
3523 };
3524
3525 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
3526 {
3527         struct io_ring_ctx *ctx = iowq->ctx;
3528
3529         /*
3530          * Wake up if we have enough events, or if a timeout occured since we
3531          * started waiting. For timeouts, we always want to return to userspace,
3532          * regardless of event count.
3533          */
3534         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
3535                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
3536 }
3537
3538 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
3539                             int wake_flags, void *key)
3540 {
3541         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
3542                                                         wq);
3543
3544         /* use noflush == true, as we can't safely rely on locking context */
3545         if (!io_should_wake(iowq, true))
3546                 return -1;
3547
3548         return autoremove_wake_function(curr, mode, wake_flags, key);
3549 }
3550
3551 /*
3552  * Wait until events become available, if we don't already have some. The
3553  * application must reap them itself, as they reside on the shared cq ring.
3554  */
3555 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
3556                           const sigset_t __user *sig, size_t sigsz)
3557 {
3558         struct io_wait_queue iowq = {
3559                 .wq = {
3560                         .private        = current,
3561                         .func           = io_wake_function,
3562                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
3563                 },
3564                 .ctx            = ctx,
3565                 .to_wait        = min_events,
3566         };
3567         struct io_rings *rings = ctx->rings;
3568         int ret = 0;
3569
3570         if (io_cqring_events(ctx, false) >= min_events)
3571                 return 0;
3572
3573         if (sig) {
3574 #ifdef CONFIG_COMPAT
3575                 if (in_compat_syscall())
3576                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
3577                                                       sigsz);
3578                 else
3579 #endif
3580                         ret = set_user_sigmask(sig, sigsz);
3581
3582                 if (ret)
3583                         return ret;
3584         }
3585
3586         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
3587         trace_io_uring_cqring_wait(ctx, min_events);
3588         do {
3589                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
3590                                                 TASK_INTERRUPTIBLE);
3591                 if (io_should_wake(&iowq, false))
3592                         break;
3593                 schedule();
3594                 if (signal_pending(current)) {
3595                         ret = -EINTR;
3596                         break;
3597                 }
3598         } while (1);
3599         finish_wait(&ctx->wait, &iowq.wq);
3600
3601         restore_saved_sigmask_unless(ret == -EINTR);
3602
3603         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
3604 }
3605
3606 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
3607 {
3608 #if defined(CONFIG_UNIX)
3609         if (ctx->ring_sock) {
3610                 struct sock *sock = ctx->ring_sock->sk;
3611                 struct sk_buff *skb;
3612
3613                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
3614                         kfree_skb(skb);
3615         }
3616 #else
3617         int i;
3618
3619         for (i = 0; i < ctx->nr_user_files; i++) {
3620                 struct file *file;
3621
3622                 file = io_file_from_index(ctx, i);
3623                 if (file)
3624                         fput(file);
3625         }
3626 #endif
3627 }
3628
3629 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
3630 {
3631         unsigned nr_tables, i;
3632
3633         if (!ctx->file_table)
3634                 return -ENXIO;
3635
3636         __io_sqe_files_unregister(ctx);
3637         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
3638         for (i = 0; i < nr_tables; i++)
3639                 kfree(ctx->file_table[i].files);
3640         kfree(ctx->file_table);
3641         ctx->file_table = NULL;
3642         ctx->nr_user_files = 0;
3643         return 0;
3644 }
3645
3646 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
3647 {
3648         if (ctx->sqo_thread) {
3649                 wait_for_completion(&ctx->completions[1]);
3650                 /*
3651                  * The park is a bit of a work-around, without it we get
3652                  * warning spews on shutdown with SQPOLL set and affinity
3653                  * set to a single CPU.
3654                  */
3655                 kthread_park(ctx->sqo_thread);
3656                 kthread_stop(ctx->sqo_thread);
3657                 ctx->sqo_thread = NULL;
3658         }
3659 }
3660
3661 static void io_finish_async(struct io_ring_ctx *ctx)
3662 {
3663         io_sq_thread_stop(ctx);
3664
3665         if (ctx->io_wq) {
3666                 io_wq_destroy(ctx->io_wq);
3667                 ctx->io_wq = NULL;
3668         }
3669 }
3670
3671 #if defined(CONFIG_UNIX)
3672 static void io_destruct_skb(struct sk_buff *skb)
3673 {
3674         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
3675
3676         if (ctx->io_wq)
3677                 io_wq_flush(ctx->io_wq);
3678
3679         unix_destruct_scm(skb);
3680 }
3681
3682 /*
3683  * Ensure the UNIX gc is aware of our file set, so we are certain that
3684  * the io_uring can be safely unregistered on process exit, even if we have
3685  * loops in the file referencing.
3686  */
3687 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
3688 {
3689         struct sock *sk = ctx->ring_sock->sk;
3690         struct scm_fp_list *fpl;
3691         struct sk_buff *skb;
3692         int i, nr_files;
3693
3694         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
3695                 unsigned long inflight = ctx->user->unix_inflight + nr;
3696
3697                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
3698                         return -EMFILE;
3699         }
3700
3701         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
3702         if (!fpl)
3703                 return -ENOMEM;
3704
3705         skb = alloc_skb(0, GFP_KERNEL);
3706         if (!skb) {
3707                 kfree(fpl);
3708                 return -ENOMEM;
3709         }
3710
3711         skb->sk = sk;
3712
3713         nr_files = 0;
3714         fpl->user = get_uid(ctx->user);
3715         for (i = 0; i < nr; i++) {
3716                 struct file *file = io_file_from_index(ctx, i + offset);
3717
3718                 if (!file)
3719                         continue;
3720                 fpl->fp[nr_files] = get_file(file);
3721                 unix_inflight(fpl->user, fpl->fp[nr_files]);
3722                 nr_files++;
3723         }
3724
3725         if (nr_files) {
3726                 fpl->max = SCM_MAX_FD;
3727                 fpl->count = nr_files;
3728                 UNIXCB(skb).fp = fpl;
3729                 skb->destructor = io_destruct_skb;
3730                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
3731                 skb_queue_head(&sk->sk_receive_queue, skb);
3732
3733                 for (i = 0; i < nr_files; i++)
3734                         fput(fpl->fp[i]);
3735         } else {
3736                 kfree_skb(skb);
3737                 kfree(fpl);
3738         }
3739
3740         return 0;
3741 }
3742
3743 /*
3744  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
3745  * causes regular reference counting to break down. We rely on the UNIX
3746  * garbage collection to take care of this problem for us.
3747  */
3748 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3749 {
3750         unsigned left, total;
3751         int ret = 0;
3752
3753         total = 0;
3754         left = ctx->nr_user_files;
3755         while (left) {
3756                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
3757
3758                 ret = __io_sqe_files_scm(ctx, this_files, total);
3759                 if (ret)
3760                         break;
3761                 left -= this_files;
3762                 total += this_files;
3763         }
3764
3765         if (!ret)
3766                 return 0;
3767
3768         while (total < ctx->nr_user_files) {
3769                 struct file *file = io_file_from_index(ctx, total);
3770
3771                 if (file)
3772                         fput(file);
3773                 total++;
3774         }
3775
3776         return ret;
3777 }
3778 #else
3779 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3780 {
3781         return 0;
3782 }
3783 #endif
3784
3785 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
3786                                     unsigned nr_files)
3787 {
3788         int i;
3789
3790         for (i = 0; i < nr_tables; i++) {
3791                 struct fixed_file_table *table = &ctx->file_table[i];
3792                 unsigned this_files;
3793
3794                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
3795                 table->files = kcalloc(this_files, sizeof(struct file *),
3796                                         GFP_KERNEL);
3797                 if (!table->files)
3798                         break;
3799                 nr_files -= this_files;
3800         }
3801
3802         if (i == nr_tables)
3803                 return 0;
3804
3805         for (i = 0; i < nr_tables; i++) {
3806                 struct fixed_file_table *table = &ctx->file_table[i];
3807                 kfree(table->files);
3808         }
3809         return 1;
3810 }
3811
3812 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
3813                                  unsigned nr_args)
3814 {
3815         __s32 __user *fds = (__s32 __user *) arg;
3816         unsigned nr_tables;
3817         int fd, ret = 0;
3818         unsigned i;
3819
3820         if (ctx->file_table)
3821                 return -EBUSY;
3822         if (!nr_args)
3823                 return -EINVAL;
3824         if (nr_args > IORING_MAX_FIXED_FILES)
3825                 return -EMFILE;
3826
3827         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
3828         ctx->file_table = kcalloc(nr_tables, sizeof(struct fixed_file_table),
3829                                         GFP_KERNEL);
3830         if (!ctx->file_table)
3831                 return -ENOMEM;
3832
3833         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
3834                 kfree(ctx->file_table);
3835                 ctx->file_table = NULL;
3836                 return -ENOMEM;
3837         }
3838
3839         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
3840                 struct fixed_file_table *table;
3841                 unsigned index;
3842
3843                 ret = -EFAULT;
3844                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
3845                         break;
3846                 /* allow sparse sets */
3847                 if (fd == -1) {
3848                         ret = 0;
3849                         continue;
3850                 }
3851
3852                 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
3853                 index = i & IORING_FILE_TABLE_MASK;
3854                 table->files[index] = fget(fd);
3855
3856                 ret = -EBADF;
3857                 if (!table->files[index])
3858                         break;
3859                 /*
3860                  * Don't allow io_uring instances to be registered. If UNIX
3861                  * isn't enabled, then this causes a reference cycle and this
3862                  * instance can never get freed. If UNIX is enabled we'll
3863                  * handle it just fine, but there's still no point in allowing
3864                  * a ring fd as it doesn't support regular read/write anyway.
3865                  */
3866                 if (table->files[index]->f_op == &io_uring_fops) {
3867                         fput(table->files[index]);
3868                         break;
3869                 }
3870                 ret = 0;
3871         }
3872
3873         if (ret) {
3874                 for (i = 0; i < ctx->nr_user_files; i++) {
3875                         struct file *file;
3876
3877                         file = io_file_from_index(ctx, i);
3878                         if (file)
3879                                 fput(file);
3880                 }
3881                 for (i = 0; i < nr_tables; i++)
3882                         kfree(ctx->file_table[i].files);
3883
3884                 kfree(ctx->file_table);
3885                 ctx->file_table = NULL;
3886                 ctx->nr_user_files = 0;
3887                 return ret;
3888         }
3889
3890         ret = io_sqe_files_scm(ctx);
3891         if (ret)
3892                 io_sqe_files_unregister(ctx);
3893
3894         return ret;
3895 }
3896
3897 static void io_sqe_file_unregister(struct io_ring_ctx *ctx, int index)
3898 {
3899 #if defined(CONFIG_UNIX)
3900         struct file *file = io_file_from_index(ctx, index);
3901         struct sock *sock = ctx->ring_sock->sk;
3902         struct sk_buff_head list, *head = &sock->sk_receive_queue;
3903         struct sk_buff *skb;
3904         int i;
3905
3906         __skb_queue_head_init(&list);
3907
3908         /*
3909          * Find the skb that holds this file in its SCM_RIGHTS. When found,
3910          * remove this entry and rearrange the file array.
3911          */
3912         skb = skb_dequeue(head);
3913         while (skb) {
3914                 struct scm_fp_list *fp;
3915
3916                 fp = UNIXCB(skb).fp;
3917                 for (i = 0; i < fp->count; i++) {
3918                         int left;
3919
3920                         if (fp->fp[i] != file)
3921                                 continue;
3922
3923                         unix_notinflight(fp->user, fp->fp[i]);
3924                         left = fp->count - 1 - i;
3925                         if (left) {
3926                                 memmove(&fp->fp[i], &fp->fp[i + 1],
3927                                                 left * sizeof(struct file *));
3928                         }
3929                         fp->count--;
3930                         if (!fp->count) {
3931                                 kfree_skb(skb);
3932                                 skb = NULL;
3933                         } else {
3934                                 __skb_queue_tail(&list, skb);
3935                         }
3936                         fput(file);
3937                         file = NULL;
3938                         break;
3939                 }
3940
3941                 if (!file)
3942                         break;
3943
3944                 __skb_queue_tail(&list, skb);
3945
3946                 skb = skb_dequeue(head);
3947         }
3948
3949         if (skb_peek(&list)) {
3950                 spin_lock_irq(&head->lock);
3951                 while ((skb = __skb_dequeue(&list)) != NULL)
3952                         __skb_queue_tail(head, skb);
3953                 spin_unlock_irq(&head->lock);
3954         }
3955 #else
3956         fput(io_file_from_index(ctx, index));
3957 #endif
3958 }
3959
3960 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
3961                                 int index)
3962 {
3963 #if defined(CONFIG_UNIX)
3964         struct sock *sock = ctx->ring_sock->sk;
3965         struct sk_buff_head *head = &sock->sk_receive_queue;
3966         struct sk_buff *skb;
3967
3968         /*
3969          * See if we can merge this file into an existing skb SCM_RIGHTS
3970          * file set. If there's no room, fall back to allocating a new skb
3971          * and filling it in.
3972          */
3973         spin_lock_irq(&head->lock);
3974         skb = skb_peek(head);
3975         if (skb) {
3976                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
3977
3978                 if (fpl->count < SCM_MAX_FD) {
3979                         __skb_unlink(skb, head);
3980                         spin_unlock_irq(&head->lock);
3981                         fpl->fp[fpl->count] = get_file(file);
3982                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
3983                         fpl->count++;
3984                         spin_lock_irq(&head->lock);
3985                         __skb_queue_head(head, skb);
3986                 } else {
3987                         skb = NULL;
3988                 }
3989         }
3990         spin_unlock_irq(&head->lock);
3991
3992         if (skb) {
3993                 fput(file);
3994                 return 0;
3995         }
3996
3997         return __io_sqe_files_scm(ctx, 1, index);
3998 #else
3999         return 0;
4000 #endif
4001 }
4002
4003 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
4004                                unsigned nr_args)
4005 {
4006         struct io_uring_files_update up;
4007         __s32 __user *fds;
4008         int fd, i, err;
4009         __u32 done;
4010
4011         if (!ctx->file_table)
4012                 return -ENXIO;
4013         if (!nr_args)
4014                 return -EINVAL;
4015         if (copy_from_user(&up, arg, sizeof(up)))
4016                 return -EFAULT;
4017         if (check_add_overflow(up.offset, nr_args, &done))
4018                 return -EOVERFLOW;
4019         if (done > ctx->nr_user_files)
4020                 return -EINVAL;
4021
4022         done = 0;
4023         fds = (__s32 __user *) up.fds;
4024         while (nr_args) {
4025                 struct fixed_file_table *table;
4026                 unsigned index;
4027
4028                 err = 0;
4029                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
4030                         err = -EFAULT;
4031                         break;
4032                 }
4033                 i = array_index_nospec(up.offset, ctx->nr_user_files);
4034                 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
4035                 index = i & IORING_FILE_TABLE_MASK;
4036                 if (table->files[index]) {
4037                         io_sqe_file_unregister(ctx, i);
4038                         table->files[index] = NULL;
4039                 }
4040                 if (fd != -1) {
4041                         struct file *file;
4042
4043                         file = fget(fd);
4044                         if (!file) {
4045                                 err = -EBADF;
4046                                 break;
4047                         }
4048                         /*
4049                          * Don't allow io_uring instances to be registered. If
4050                          * UNIX isn't enabled, then this causes a reference
4051                          * cycle and this instance can never get freed. If UNIX
4052                          * is enabled we'll handle it just fine, but there's
4053                          * still no point in allowing a ring fd as it doesn't
4054                          * support regular read/write anyway.
4055                          */
4056                         if (file->f_op == &io_uring_fops) {
4057                                 fput(file);
4058                                 err = -EBADF;
4059                                 break;
4060                         }
4061                         table->files[index] = file;
4062                         err = io_sqe_file_register(ctx, file, i);
4063                         if (err)
4064                                 break;
4065                 }
4066                 nr_args--;
4067                 done++;
4068                 up.offset++;
4069         }
4070
4071         return done ? done : err;
4072 }
4073
4074 static void io_put_work(struct io_wq_work *work)
4075 {
4076         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4077
4078         io_put_req(req);
4079 }
4080
4081 static void io_get_work(struct io_wq_work *work)
4082 {
4083         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4084
4085         refcount_inc(&req->refs);
4086 }
4087
4088 static int io_sq_offload_start(struct io_ring_ctx *ctx,
4089                                struct io_uring_params *p)
4090 {
4091         struct io_wq_data data;
4092         unsigned concurrency;
4093         int ret;
4094
4095         init_waitqueue_head(&ctx->sqo_wait);
4096         mmgrab(current->mm);
4097         ctx->sqo_mm = current->mm;
4098
4099         if (ctx->flags & IORING_SETUP_SQPOLL) {
4100                 ret = -EPERM;
4101                 if (!capable(CAP_SYS_ADMIN))
4102                         goto err;
4103
4104                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
4105                 if (!ctx->sq_thread_idle)
4106                         ctx->sq_thread_idle = HZ;
4107
4108                 if (p->flags & IORING_SETUP_SQ_AFF) {
4109                         int cpu = p->sq_thread_cpu;
4110
4111                         ret = -EINVAL;
4112                         if (cpu >= nr_cpu_ids)
4113                                 goto err;
4114                         if (!cpu_online(cpu))
4115                                 goto err;
4116
4117                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
4118                                                         ctx, cpu,
4119                                                         "io_uring-sq");
4120                 } else {
4121                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
4122                                                         "io_uring-sq");
4123                 }
4124                 if (IS_ERR(ctx->sqo_thread)) {
4125                         ret = PTR_ERR(ctx->sqo_thread);
4126                         ctx->sqo_thread = NULL;
4127                         goto err;
4128                 }
4129                 wake_up_process(ctx->sqo_thread);
4130         } else if (p->flags & IORING_SETUP_SQ_AFF) {
4131                 /* Can't have SQ_AFF without SQPOLL */
4132                 ret = -EINVAL;
4133                 goto err;
4134         }
4135
4136         data.mm = ctx->sqo_mm;
4137         data.user = ctx->user;
4138         data.creds = ctx->creds;
4139         data.get_work = io_get_work;
4140         data.put_work = io_put_work;
4141
4142         /* Do QD, or 4 * CPUS, whatever is smallest */
4143         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
4144         ctx->io_wq = io_wq_create(concurrency, &data);
4145         if (IS_ERR(ctx->io_wq)) {
4146                 ret = PTR_ERR(ctx->io_wq);
4147                 ctx->io_wq = NULL;
4148                 goto err;
4149         }
4150
4151         return 0;
4152 err:
4153         io_finish_async(ctx);
4154         mmdrop(ctx->sqo_mm);
4155         ctx->sqo_mm = NULL;
4156         return ret;
4157 }
4158
4159 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
4160 {
4161         atomic_long_sub(nr_pages, &user->locked_vm);
4162 }
4163
4164 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
4165 {
4166         unsigned long page_limit, cur_pages, new_pages;
4167
4168         /* Don't allow more pages than we can safely lock */
4169         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
4170
4171         do {
4172                 cur_pages = atomic_long_read(&user->locked_vm);
4173                 new_pages = cur_pages + nr_pages;
4174                 if (new_pages > page_limit)
4175                         return -ENOMEM;
4176         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
4177                                         new_pages) != cur_pages);
4178
4179         return 0;
4180 }
4181
4182 static void io_mem_free(void *ptr)
4183 {
4184         struct page *page;
4185
4186         if (!ptr)
4187                 return;
4188
4189         page = virt_to_head_page(ptr);
4190         if (put_page_testzero(page))
4191                 free_compound_page(page);
4192 }
4193
4194 static void *io_mem_alloc(size_t size)
4195 {
4196         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
4197                                 __GFP_NORETRY;
4198
4199         return (void *) __get_free_pages(gfp_flags, get_order(size));
4200 }
4201
4202 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
4203                                 size_t *sq_offset)
4204 {
4205         struct io_rings *rings;
4206         size_t off, sq_array_size;
4207
4208         off = struct_size(rings, cqes, cq_entries);
4209         if (off == SIZE_MAX)
4210                 return SIZE_MAX;
4211
4212 #ifdef CONFIG_SMP
4213         off = ALIGN(off, SMP_CACHE_BYTES);
4214         if (off == 0)
4215                 return SIZE_MAX;
4216 #endif
4217
4218         sq_array_size = array_size(sizeof(u32), sq_entries);
4219         if (sq_array_size == SIZE_MAX)
4220                 return SIZE_MAX;
4221
4222         if (check_add_overflow(off, sq_array_size, &off))
4223                 return SIZE_MAX;
4224
4225         if (sq_offset)
4226                 *sq_offset = off;
4227
4228         return off;
4229 }
4230
4231 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
4232 {
4233         size_t pages;
4234
4235         pages = (size_t)1 << get_order(
4236                 rings_size(sq_entries, cq_entries, NULL));
4237         pages += (size_t)1 << get_order(
4238                 array_size(sizeof(struct io_uring_sqe), sq_entries));
4239
4240         return pages;
4241 }
4242
4243 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
4244 {
4245         int i, j;
4246
4247         if (!ctx->user_bufs)
4248                 return -ENXIO;
4249
4250         for (i = 0; i < ctx->nr_user_bufs; i++) {
4251                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4252
4253                 for (j = 0; j < imu->nr_bvecs; j++)
4254                         put_user_page(imu->bvec[j].bv_page);
4255
4256                 if (ctx->account_mem)
4257                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
4258                 kvfree(imu->bvec);
4259                 imu->nr_bvecs = 0;
4260         }
4261
4262         kfree(ctx->user_bufs);
4263         ctx->user_bufs = NULL;
4264         ctx->nr_user_bufs = 0;
4265         return 0;
4266 }
4267
4268 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
4269                        void __user *arg, unsigned index)
4270 {
4271         struct iovec __user *src;
4272
4273 #ifdef CONFIG_COMPAT
4274         if (ctx->compat) {
4275                 struct compat_iovec __user *ciovs;
4276                 struct compat_iovec ciov;
4277
4278                 ciovs = (struct compat_iovec __user *) arg;
4279                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
4280                         return -EFAULT;
4281
4282                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
4283                 dst->iov_len = ciov.iov_len;
4284                 return 0;
4285         }
4286 #endif
4287         src = (struct iovec __user *) arg;
4288         if (copy_from_user(dst, &src[index], sizeof(*dst)))
4289                 return -EFAULT;
4290         return 0;
4291 }
4292
4293 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
4294                                   unsigned nr_args)
4295 {
4296         struct vm_area_struct **vmas = NULL;
4297         struct page **pages = NULL;
4298         int i, j, got_pages = 0;
4299         int ret = -EINVAL;
4300
4301         if (ctx->user_bufs)
4302                 return -EBUSY;
4303         if (!nr_args || nr_args > UIO_MAXIOV)
4304                 return -EINVAL;
4305
4306         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
4307                                         GFP_KERNEL);
4308         if (!ctx->user_bufs)
4309                 return -ENOMEM;
4310
4311         for (i = 0; i < nr_args; i++) {
4312                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4313                 unsigned long off, start, end, ubuf;
4314                 int pret, nr_pages;
4315                 struct iovec iov;
4316                 size_t size;
4317
4318                 ret = io_copy_iov(ctx, &iov, arg, i);
4319                 if (ret)
4320                         goto err;
4321
4322                 /*
4323                  * Don't impose further limits on the size and buffer
4324                  * constraints here, we'll -EINVAL later when IO is
4325                  * submitted if they are wrong.
4326                  */
4327                 ret = -EFAULT;
4328                 if (!iov.iov_base || !iov.iov_len)
4329                         goto err;
4330
4331                 /* arbitrary limit, but we need something */
4332                 if (iov.iov_len > SZ_1G)
4333                         goto err;
4334
4335                 ubuf = (unsigned long) iov.iov_base;
4336                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
4337                 start = ubuf >> PAGE_SHIFT;
4338                 nr_pages = end - start;
4339
4340                 if (ctx->account_mem) {
4341                         ret = io_account_mem(ctx->user, nr_pages);
4342                         if (ret)
4343                                 goto err;
4344                 }
4345
4346                 ret = 0;
4347                 if (!pages || nr_pages > got_pages) {
4348                         kfree(vmas);
4349                         kfree(pages);
4350                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
4351                                                 GFP_KERNEL);
4352                         vmas = kvmalloc_array(nr_pages,
4353                                         sizeof(struct vm_area_struct *),
4354                                         GFP_KERNEL);
4355                         if (!pages || !vmas) {
4356                                 ret = -ENOMEM;
4357                                 if (ctx->account_mem)
4358                                         io_unaccount_mem(ctx->user, nr_pages);
4359                                 goto err;
4360                         }
4361                         got_pages = nr_pages;
4362                 }
4363
4364                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
4365                                                 GFP_KERNEL);
4366                 ret = -ENOMEM;
4367                 if (!imu->bvec) {
4368                         if (ctx->account_mem)
4369                                 io_unaccount_mem(ctx->user, nr_pages);
4370                         goto err;
4371                 }
4372
4373                 ret = 0;
4374                 down_read(&current->mm->mmap_sem);
4375                 pret = get_user_pages(ubuf, nr_pages,
4376                                       FOLL_WRITE | FOLL_LONGTERM,
4377                                       pages, vmas);
4378                 if (pret == nr_pages) {
4379                         /* don't support file backed memory */
4380                         for (j = 0; j < nr_pages; j++) {
4381                                 struct vm_area_struct *vma = vmas[j];
4382
4383                                 if (vma->vm_file &&
4384                                     !is_file_hugepages(vma->vm_file)) {
4385                                         ret = -EOPNOTSUPP;
4386                                         break;
4387                                 }
4388                         }
4389                 } else {
4390                         ret = pret < 0 ? pret : -EFAULT;
4391                 }
4392                 up_read(&current->mm->mmap_sem);
4393                 if (ret) {
4394                         /*
4395                          * if we did partial map, or found file backed vmas,
4396                          * release any pages we did get
4397                          */
4398                         if (pret > 0)
4399                                 put_user_pages(pages, pret);
4400                         if (ctx->account_mem)
4401                                 io_unaccount_mem(ctx->user, nr_pages);
4402                         kvfree(imu->bvec);
4403                         goto err;
4404                 }
4405
4406                 off = ubuf & ~PAGE_MASK;
4407                 size = iov.iov_len;
4408                 for (j = 0; j < nr_pages; j++) {
4409                         size_t vec_len;
4410
4411                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
4412                         imu->bvec[j].bv_page = pages[j];
4413                         imu->bvec[j].bv_len = vec_len;
4414                         imu->bvec[j].bv_offset = off;
4415                         off = 0;
4416                         size -= vec_len;
4417                 }
4418                 /* store original address for later verification */
4419                 imu->ubuf = ubuf;
4420                 imu->len = iov.iov_len;
4421                 imu->nr_bvecs = nr_pages;
4422
4423                 ctx->nr_user_bufs++;
4424         }
4425         kvfree(pages);
4426         kvfree(vmas);
4427         return 0;
4428 err:
4429         kvfree(pages);
4430         kvfree(vmas);
4431         io_sqe_buffer_unregister(ctx);
4432         return ret;
4433 }
4434
4435 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
4436 {
4437         __s32 __user *fds = arg;
4438         int fd;
4439
4440         if (ctx->cq_ev_fd)
4441                 return -EBUSY;
4442
4443         if (copy_from_user(&fd, fds, sizeof(*fds)))
4444                 return -EFAULT;
4445
4446         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
4447         if (IS_ERR(ctx->cq_ev_fd)) {
4448                 int ret = PTR_ERR(ctx->cq_ev_fd);
4449                 ctx->cq_ev_fd = NULL;
4450                 return ret;
4451         }
4452
4453         return 0;
4454 }
4455
4456 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
4457 {
4458         if (ctx->cq_ev_fd) {
4459                 eventfd_ctx_put(ctx->cq_ev_fd);
4460                 ctx->cq_ev_fd = NULL;
4461                 return 0;
4462         }
4463
4464         return -ENXIO;
4465 }
4466
4467 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
4468 {
4469         io_finish_async(ctx);
4470         if (ctx->sqo_mm)
4471                 mmdrop(ctx->sqo_mm);
4472
4473         io_iopoll_reap_events(ctx);
4474         io_sqe_buffer_unregister(ctx);
4475         io_sqe_files_unregister(ctx);
4476         io_eventfd_unregister(ctx);
4477
4478 #if defined(CONFIG_UNIX)
4479         if (ctx->ring_sock) {
4480                 ctx->ring_sock->file = NULL; /* so that iput() is called */
4481                 sock_release(ctx->ring_sock);
4482         }
4483 #endif
4484
4485         io_mem_free(ctx->rings);
4486         io_mem_free(ctx->sq_sqes);
4487
4488         percpu_ref_exit(&ctx->refs);
4489         if (ctx->account_mem)
4490                 io_unaccount_mem(ctx->user,
4491                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
4492         free_uid(ctx->user);
4493         put_cred(ctx->creds);
4494         kfree(ctx->completions);
4495         kmem_cache_free(req_cachep, ctx->fallback_req);
4496         kfree(ctx);
4497 }
4498
4499 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
4500 {
4501         struct io_ring_ctx *ctx = file->private_data;
4502         __poll_t mask = 0;
4503
4504         poll_wait(file, &ctx->cq_wait, wait);
4505         /*
4506          * synchronizes with barrier from wq_has_sleeper call in
4507          * io_commit_cqring
4508          */
4509         smp_rmb();
4510         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
4511             ctx->rings->sq_ring_entries)
4512                 mask |= EPOLLOUT | EPOLLWRNORM;
4513         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
4514                 mask |= EPOLLIN | EPOLLRDNORM;
4515
4516         return mask;
4517 }
4518
4519 static int io_uring_fasync(int fd, struct file *file, int on)
4520 {
4521         struct io_ring_ctx *ctx = file->private_data;
4522
4523         return fasync_helper(fd, file, on, &ctx->cq_fasync);
4524 }
4525
4526 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
4527 {
4528         mutex_lock(&ctx->uring_lock);
4529         percpu_ref_kill(&ctx->refs);
4530         mutex_unlock(&ctx->uring_lock);
4531
4532         io_kill_timeouts(ctx);
4533         io_poll_remove_all(ctx);
4534
4535         if (ctx->io_wq)
4536                 io_wq_cancel_all(ctx->io_wq);
4537
4538         io_iopoll_reap_events(ctx);
4539         /* if we failed setting up the ctx, we might not have any rings */
4540         if (ctx->rings)
4541                 io_cqring_overflow_flush(ctx, true);
4542         wait_for_completion(&ctx->completions[0]);
4543         io_ring_ctx_free(ctx);
4544 }
4545
4546 static int io_uring_release(struct inode *inode, struct file *file)
4547 {
4548         struct io_ring_ctx *ctx = file->private_data;
4549
4550         file->private_data = NULL;
4551         io_ring_ctx_wait_and_kill(ctx);
4552         return 0;
4553 }
4554
4555 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
4556                                   struct files_struct *files)
4557 {
4558         struct io_kiocb *req;
4559         DEFINE_WAIT(wait);
4560
4561         while (!list_empty_careful(&ctx->inflight_list)) {
4562                 struct io_kiocb *cancel_req = NULL;
4563
4564                 spin_lock_irq(&ctx->inflight_lock);
4565                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
4566                         if (req->work.files != files)
4567                                 continue;
4568                         /* req is being completed, ignore */
4569                         if (!refcount_inc_not_zero(&req->refs))
4570                                 continue;
4571                         cancel_req = req;
4572                         break;
4573                 }
4574                 if (cancel_req)
4575                         prepare_to_wait(&ctx->inflight_wait, &wait,
4576                                                 TASK_UNINTERRUPTIBLE);
4577                 spin_unlock_irq(&ctx->inflight_lock);
4578
4579                 /* We need to keep going until we don't find a matching req */
4580                 if (!cancel_req)
4581                         break;
4582
4583                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
4584                 io_put_req(cancel_req);
4585                 schedule();
4586         }
4587         finish_wait(&ctx->inflight_wait, &wait);
4588 }
4589
4590 static int io_uring_flush(struct file *file, void *data)
4591 {
4592         struct io_ring_ctx *ctx = file->private_data;
4593
4594         io_uring_cancel_files(ctx, data);
4595         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
4596                 io_cqring_overflow_flush(ctx, true);
4597                 io_wq_cancel_all(ctx->io_wq);
4598         }
4599         return 0;
4600 }
4601
4602 static void *io_uring_validate_mmap_request(struct file *file,
4603                                             loff_t pgoff, size_t sz)
4604 {
4605         struct io_ring_ctx *ctx = file->private_data;
4606         loff_t offset = pgoff << PAGE_SHIFT;
4607         struct page *page;
4608         void *ptr;
4609
4610         switch (offset) {
4611         case IORING_OFF_SQ_RING:
4612         case IORING_OFF_CQ_RING:
4613                 ptr = ctx->rings;
4614                 break;
4615         case IORING_OFF_SQES:
4616                 ptr = ctx->sq_sqes;
4617                 break;
4618         default:
4619                 return ERR_PTR(-EINVAL);
4620         }
4621
4622         page = virt_to_head_page(ptr);
4623         if (sz > page_size(page))
4624                 return ERR_PTR(-EINVAL);
4625
4626         return ptr;
4627 }
4628
4629 #ifdef CONFIG_MMU
4630
4631 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
4632 {
4633         size_t sz = vma->vm_end - vma->vm_start;
4634         unsigned long pfn;
4635         void *ptr;
4636
4637         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
4638         if (IS_ERR(ptr))
4639                 return PTR_ERR(ptr);
4640
4641         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
4642         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
4643 }
4644
4645 #else /* !CONFIG_MMU */
4646
4647 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
4648 {
4649         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
4650 }
4651
4652 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
4653 {
4654         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
4655 }
4656
4657 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
4658         unsigned long addr, unsigned long len,
4659         unsigned long pgoff, unsigned long flags)
4660 {
4661         void *ptr;
4662
4663         ptr = io_uring_validate_mmap_request(file, pgoff, len);
4664         if (IS_ERR(ptr))
4665                 return PTR_ERR(ptr);
4666
4667         return (unsigned long) ptr;
4668 }
4669
4670 #endif /* !CONFIG_MMU */
4671
4672 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
4673                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
4674                 size_t, sigsz)
4675 {
4676         struct io_ring_ctx *ctx;
4677         long ret = -EBADF;
4678         int submitted = 0;
4679         struct fd f;
4680
4681         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
4682                 return -EINVAL;
4683
4684         f = fdget(fd);
4685         if (!f.file)
4686                 return -EBADF;
4687
4688         ret = -EOPNOTSUPP;
4689         if (f.file->f_op != &io_uring_fops)
4690                 goto out_fput;
4691
4692         ret = -ENXIO;
4693         ctx = f.file->private_data;
4694         if (!percpu_ref_tryget(&ctx->refs))
4695                 goto out_fput;
4696
4697         /*
4698          * For SQ polling, the thread will do all submissions and completions.
4699          * Just return the requested submit count, and wake the thread if
4700          * we were asked to.
4701          */
4702         ret = 0;
4703         if (ctx->flags & IORING_SETUP_SQPOLL) {
4704                 if (!list_empty_careful(&ctx->cq_overflow_list))
4705                         io_cqring_overflow_flush(ctx, false);
4706                 if (flags & IORING_ENTER_SQ_WAKEUP)
4707                         wake_up(&ctx->sqo_wait);
4708                 submitted = to_submit;
4709         } else if (to_submit) {
4710                 struct mm_struct *cur_mm;
4711
4712                 to_submit = min(to_submit, ctx->sq_entries);
4713                 mutex_lock(&ctx->uring_lock);
4714                 /* already have mm, so io_submit_sqes() won't try to grab it */
4715                 cur_mm = ctx->sqo_mm;
4716                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
4717                                            &cur_mm, false);
4718                 mutex_unlock(&ctx->uring_lock);
4719         }
4720         if (flags & IORING_ENTER_GETEVENTS) {
4721                 unsigned nr_events = 0;
4722
4723                 min_complete = min(min_complete, ctx->cq_entries);
4724
4725                 if (ctx->flags & IORING_SETUP_IOPOLL) {
4726                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
4727                 } else {
4728                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
4729                 }
4730         }
4731
4732         percpu_ref_put(&ctx->refs);
4733 out_fput:
4734         fdput(f);
4735         return submitted ? submitted : ret;
4736 }
4737
4738 static const struct file_operations io_uring_fops = {
4739         .release        = io_uring_release,
4740         .flush          = io_uring_flush,
4741         .mmap           = io_uring_mmap,
4742 #ifndef CONFIG_MMU
4743         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
4744         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
4745 #endif
4746         .poll           = io_uring_poll,
4747         .fasync         = io_uring_fasync,
4748 };
4749
4750 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
4751                                   struct io_uring_params *p)
4752 {
4753         struct io_rings *rings;
4754         size_t size, sq_array_offset;
4755
4756         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
4757         if (size == SIZE_MAX)
4758                 return -EOVERFLOW;
4759
4760         rings = io_mem_alloc(size);
4761         if (!rings)
4762                 return -ENOMEM;
4763
4764         ctx->rings = rings;
4765         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
4766         rings->sq_ring_mask = p->sq_entries - 1;
4767         rings->cq_ring_mask = p->cq_entries - 1;
4768         rings->sq_ring_entries = p->sq_entries;
4769         rings->cq_ring_entries = p->cq_entries;
4770         ctx->sq_mask = rings->sq_ring_mask;
4771         ctx->cq_mask = rings->cq_ring_mask;
4772         ctx->sq_entries = rings->sq_ring_entries;
4773         ctx->cq_entries = rings->cq_ring_entries;
4774
4775         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
4776         if (size == SIZE_MAX) {
4777                 io_mem_free(ctx->rings);
4778                 ctx->rings = NULL;
4779                 return -EOVERFLOW;
4780         }
4781
4782         ctx->sq_sqes = io_mem_alloc(size);
4783         if (!ctx->sq_sqes) {
4784                 io_mem_free(ctx->rings);
4785                 ctx->rings = NULL;
4786                 return -ENOMEM;
4787         }
4788
4789         return 0;
4790 }
4791
4792 /*
4793  * Allocate an anonymous fd, this is what constitutes the application
4794  * visible backing of an io_uring instance. The application mmaps this
4795  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
4796  * we have to tie this fd to a socket for file garbage collection purposes.
4797  */
4798 static int io_uring_get_fd(struct io_ring_ctx *ctx)
4799 {
4800         struct file *file;
4801         int ret;
4802
4803 #if defined(CONFIG_UNIX)
4804         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
4805                                 &ctx->ring_sock);
4806         if (ret)
4807                 return ret;
4808 #endif
4809
4810         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
4811         if (ret < 0)
4812                 goto err;
4813
4814         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
4815                                         O_RDWR | O_CLOEXEC);
4816         if (IS_ERR(file)) {
4817                 put_unused_fd(ret);
4818                 ret = PTR_ERR(file);
4819                 goto err;
4820         }
4821
4822 #if defined(CONFIG_UNIX)
4823         ctx->ring_sock->file = file;
4824         ctx->ring_sock->sk->sk_user_data = ctx;
4825 #endif
4826         fd_install(ret, file);
4827         return ret;
4828 err:
4829 #if defined(CONFIG_UNIX)
4830         sock_release(ctx->ring_sock);
4831         ctx->ring_sock = NULL;
4832 #endif
4833         return ret;
4834 }
4835
4836 static int io_uring_create(unsigned entries, struct io_uring_params *p)
4837 {
4838         struct user_struct *user = NULL;
4839         struct io_ring_ctx *ctx;
4840         bool account_mem;
4841         int ret;
4842
4843         if (!entries || entries > IORING_MAX_ENTRIES)
4844                 return -EINVAL;
4845
4846         /*
4847          * Use twice as many entries for the CQ ring. It's possible for the
4848          * application to drive a higher depth than the size of the SQ ring,
4849          * since the sqes are only used at submission time. This allows for
4850          * some flexibility in overcommitting a bit. If the application has
4851          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
4852          * of CQ ring entries manually.
4853          */
4854         p->sq_entries = roundup_pow_of_two(entries);
4855         if (p->flags & IORING_SETUP_CQSIZE) {
4856                 /*
4857                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
4858                  * to a power-of-two, if it isn't already. We do NOT impose
4859                  * any cq vs sq ring sizing.
4860                  */
4861                 if (p->cq_entries < p->sq_entries || p->cq_entries > IORING_MAX_CQ_ENTRIES)
4862                         return -EINVAL;
4863                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
4864         } else {
4865                 p->cq_entries = 2 * p->sq_entries;
4866         }
4867
4868         user = get_uid(current_user());
4869         account_mem = !capable(CAP_IPC_LOCK);
4870
4871         if (account_mem) {
4872                 ret = io_account_mem(user,
4873                                 ring_pages(p->sq_entries, p->cq_entries));
4874                 if (ret) {
4875                         free_uid(user);
4876                         return ret;
4877                 }
4878         }
4879
4880         ctx = io_ring_ctx_alloc(p);
4881         if (!ctx) {
4882                 if (account_mem)
4883                         io_unaccount_mem(user, ring_pages(p->sq_entries,
4884                                                                 p->cq_entries));
4885                 free_uid(user);
4886                 return -ENOMEM;
4887         }
4888         ctx->compat = in_compat_syscall();
4889         ctx->account_mem = account_mem;
4890         ctx->user = user;
4891         ctx->creds = get_current_cred();
4892
4893         ret = io_allocate_scq_urings(ctx, p);
4894         if (ret)
4895                 goto err;
4896
4897         ret = io_sq_offload_start(ctx, p);
4898         if (ret)
4899                 goto err;
4900
4901         memset(&p->sq_off, 0, sizeof(p->sq_off));
4902         p->sq_off.head = offsetof(struct io_rings, sq.head);
4903         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
4904         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
4905         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
4906         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
4907         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
4908         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
4909
4910         memset(&p->cq_off, 0, sizeof(p->cq_off));
4911         p->cq_off.head = offsetof(struct io_rings, cq.head);
4912         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
4913         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
4914         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
4915         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
4916         p->cq_off.cqes = offsetof(struct io_rings, cqes);
4917
4918         /*
4919          * Install ring fd as the very last thing, so we don't risk someone
4920          * having closed it before we finish setup
4921          */
4922         ret = io_uring_get_fd(ctx);
4923         if (ret < 0)
4924                 goto err;
4925
4926         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP;
4927         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
4928         return ret;
4929 err:
4930         io_ring_ctx_wait_and_kill(ctx);
4931         return ret;
4932 }
4933
4934 /*
4935  * Sets up an aio uring context, and returns the fd. Applications asks for a
4936  * ring size, we return the actual sq/cq ring sizes (among other things) in the
4937  * params structure passed in.
4938  */
4939 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
4940 {
4941         struct io_uring_params p;
4942         long ret;
4943         int i;
4944
4945         if (copy_from_user(&p, params, sizeof(p)))
4946                 return -EFAULT;
4947         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
4948                 if (p.resv[i])
4949                         return -EINVAL;
4950         }
4951
4952         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
4953                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
4954                 return -EINVAL;
4955
4956         ret = io_uring_create(entries, &p);
4957         if (ret < 0)
4958                 return ret;
4959
4960         if (copy_to_user(params, &p, sizeof(p)))
4961                 return -EFAULT;
4962
4963         return ret;
4964 }
4965
4966 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
4967                 struct io_uring_params __user *, params)
4968 {
4969         return io_uring_setup(entries, params);
4970 }
4971
4972 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
4973                                void __user *arg, unsigned nr_args)
4974         __releases(ctx->uring_lock)
4975         __acquires(ctx->uring_lock)
4976 {
4977         int ret;
4978
4979         /*
4980          * We're inside the ring mutex, if the ref is already dying, then
4981          * someone else killed the ctx or is already going through
4982          * io_uring_register().
4983          */
4984         if (percpu_ref_is_dying(&ctx->refs))
4985                 return -ENXIO;
4986
4987         percpu_ref_kill(&ctx->refs);
4988
4989         /*
4990          * Drop uring mutex before waiting for references to exit. If another
4991          * thread is currently inside io_uring_enter() it might need to grab
4992          * the uring_lock to make progress. If we hold it here across the drain
4993          * wait, then we can deadlock. It's safe to drop the mutex here, since
4994          * no new references will come in after we've killed the percpu ref.
4995          */
4996         mutex_unlock(&ctx->uring_lock);
4997         wait_for_completion(&ctx->completions[0]);
4998         mutex_lock(&ctx->uring_lock);
4999
5000         switch (opcode) {
5001         case IORING_REGISTER_BUFFERS:
5002                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
5003                 break;
5004         case IORING_UNREGISTER_BUFFERS:
5005                 ret = -EINVAL;
5006                 if (arg || nr_args)
5007                         break;
5008                 ret = io_sqe_buffer_unregister(ctx);
5009                 break;
5010         case IORING_REGISTER_FILES:
5011                 ret = io_sqe_files_register(ctx, arg, nr_args);
5012                 break;
5013         case IORING_UNREGISTER_FILES:
5014                 ret = -EINVAL;
5015                 if (arg || nr_args)
5016                         break;
5017                 ret = io_sqe_files_unregister(ctx);
5018                 break;
5019         case IORING_REGISTER_FILES_UPDATE:
5020                 ret = io_sqe_files_update(ctx, arg, nr_args);
5021                 break;
5022         case IORING_REGISTER_EVENTFD:
5023                 ret = -EINVAL;
5024                 if (nr_args != 1)
5025                         break;
5026                 ret = io_eventfd_register(ctx, arg);
5027                 break;
5028         case IORING_UNREGISTER_EVENTFD:
5029                 ret = -EINVAL;
5030                 if (arg || nr_args)
5031                         break;
5032                 ret = io_eventfd_unregister(ctx);
5033                 break;
5034         default:
5035                 ret = -EINVAL;
5036                 break;
5037         }
5038
5039         /* bring the ctx back to life */
5040         reinit_completion(&ctx->completions[0]);
5041         percpu_ref_reinit(&ctx->refs);
5042         return ret;
5043 }
5044
5045 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
5046                 void __user *, arg, unsigned int, nr_args)
5047 {
5048         struct io_ring_ctx *ctx;
5049         long ret = -EBADF;
5050         struct fd f;
5051
5052         f = fdget(fd);
5053         if (!f.file)
5054                 return -EBADF;
5055
5056         ret = -EOPNOTSUPP;
5057         if (f.file->f_op != &io_uring_fops)
5058                 goto out_fput;
5059
5060         ctx = f.file->private_data;
5061
5062         mutex_lock(&ctx->uring_lock);
5063         ret = __io_uring_register(ctx, opcode, arg, nr_args);
5064         mutex_unlock(&ctx->uring_lock);
5065         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
5066                                                         ctx->cq_ev_fd != NULL, ret);
5067 out_fput:
5068         fdput(f);
5069         return ret;
5070 }
5071
5072 static int __init io_uring_init(void)
5073 {
5074         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
5075         return 0;
5076 };
5077 __initcall(io_uring_init);