Merge tag 'tag-chrome-platform-for-v5.3' of git://git.kernel.org/pub/scm/linux/kernel...
[linux-2.6-microblaze.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/workqueue.h>
60 #include <linux/kthread.h>
61 #include <linux/blkdev.h>
62 #include <linux/bvec.h>
63 #include <linux/net.h>
64 #include <net/sock.h>
65 #include <net/af_unix.h>
66 #include <net/scm.h>
67 #include <linux/anon_inodes.h>
68 #include <linux/sched/mm.h>
69 #include <linux/uaccess.h>
70 #include <linux/nospec.h>
71 #include <linux/sizes.h>
72 #include <linux/hugetlb.h>
73
74 #include <uapi/linux/io_uring.h>
75
76 #include "internal.h"
77
78 #define IORING_MAX_ENTRIES      4096
79 #define IORING_MAX_FIXED_FILES  1024
80
81 struct io_uring {
82         u32 head ____cacheline_aligned_in_smp;
83         u32 tail ____cacheline_aligned_in_smp;
84 };
85
86 /*
87  * This data is shared with the application through the mmap at offset
88  * IORING_OFF_SQ_RING.
89  *
90  * The offsets to the member fields are published through struct
91  * io_sqring_offsets when calling io_uring_setup.
92  */
93 struct io_sq_ring {
94         /*
95          * Head and tail offsets into the ring; the offsets need to be
96          * masked to get valid indices.
97          *
98          * The kernel controls head and the application controls tail.
99          */
100         struct io_uring         r;
101         /*
102          * Bitmask to apply to head and tail offsets (constant, equals
103          * ring_entries - 1)
104          */
105         u32                     ring_mask;
106         /* Ring size (constant, power of 2) */
107         u32                     ring_entries;
108         /*
109          * Number of invalid entries dropped by the kernel due to
110          * invalid index stored in array
111          *
112          * Written by the kernel, shouldn't be modified by the
113          * application (i.e. get number of "new events" by comparing to
114          * cached value).
115          *
116          * After a new SQ head value was read by the application this
117          * counter includes all submissions that were dropped reaching
118          * the new SQ head (and possibly more).
119          */
120         u32                     dropped;
121         /*
122          * Runtime flags
123          *
124          * Written by the kernel, shouldn't be modified by the
125          * application.
126          *
127          * The application needs a full memory barrier before checking
128          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
129          */
130         u32                     flags;
131         /*
132          * Ring buffer of indices into array of io_uring_sqe, which is
133          * mmapped by the application using the IORING_OFF_SQES offset.
134          *
135          * This indirection could e.g. be used to assign fixed
136          * io_uring_sqe entries to operations and only submit them to
137          * the queue when needed.
138          *
139          * The kernel modifies neither the indices array nor the entries
140          * array.
141          */
142         u32                     array[];
143 };
144
145 /*
146  * This data is shared with the application through the mmap at offset
147  * IORING_OFF_CQ_RING.
148  *
149  * The offsets to the member fields are published through struct
150  * io_cqring_offsets when calling io_uring_setup.
151  */
152 struct io_cq_ring {
153         /*
154          * Head and tail offsets into the ring; the offsets need to be
155          * masked to get valid indices.
156          *
157          * The application controls head and the kernel tail.
158          */
159         struct io_uring         r;
160         /*
161          * Bitmask to apply to head and tail offsets (constant, equals
162          * ring_entries - 1)
163          */
164         u32                     ring_mask;
165         /* Ring size (constant, power of 2) */
166         u32                     ring_entries;
167         /*
168          * Number of completion events lost because the queue was full;
169          * this should be avoided by the application by making sure
170          * there are not more requests pending thatn there is space in
171          * the completion queue.
172          *
173          * Written by the kernel, shouldn't be modified by the
174          * application (i.e. get number of "new events" by comparing to
175          * cached value).
176          *
177          * As completion events come in out of order this counter is not
178          * ordered with any other data.
179          */
180         u32                     overflow;
181         /*
182          * Ring buffer of completion events.
183          *
184          * The kernel writes completion events fresh every time they are
185          * produced, so the application is allowed to modify pending
186          * entries.
187          */
188         struct io_uring_cqe     cqes[];
189 };
190
191 struct io_mapped_ubuf {
192         u64             ubuf;
193         size_t          len;
194         struct          bio_vec *bvec;
195         unsigned int    nr_bvecs;
196 };
197
198 struct async_list {
199         spinlock_t              lock;
200         atomic_t                cnt;
201         struct list_head        list;
202
203         struct file             *file;
204         off_t                   io_end;
205         size_t                  io_pages;
206 };
207
208 struct io_ring_ctx {
209         struct {
210                 struct percpu_ref       refs;
211         } ____cacheline_aligned_in_smp;
212
213         struct {
214                 unsigned int            flags;
215                 bool                    compat;
216                 bool                    account_mem;
217
218                 /* SQ ring */
219                 struct io_sq_ring       *sq_ring;
220                 unsigned                cached_sq_head;
221                 unsigned                sq_entries;
222                 unsigned                sq_mask;
223                 unsigned                sq_thread_idle;
224                 struct io_uring_sqe     *sq_sqes;
225
226                 struct list_head        defer_list;
227         } ____cacheline_aligned_in_smp;
228
229         /* IO offload */
230         struct workqueue_struct *sqo_wq;
231         struct task_struct      *sqo_thread;    /* if using sq thread polling */
232         struct mm_struct        *sqo_mm;
233         wait_queue_head_t       sqo_wait;
234
235         struct {
236                 /* CQ ring */
237                 struct io_cq_ring       *cq_ring;
238                 unsigned                cached_cq_tail;
239                 unsigned                cq_entries;
240                 unsigned                cq_mask;
241                 struct wait_queue_head  cq_wait;
242                 struct fasync_struct    *cq_fasync;
243                 struct eventfd_ctx      *cq_ev_fd;
244         } ____cacheline_aligned_in_smp;
245
246         /*
247          * If used, fixed file set. Writers must ensure that ->refs is dead,
248          * readers must ensure that ->refs is alive as long as the file* is
249          * used. Only updated through io_uring_register(2).
250          */
251         struct file             **user_files;
252         unsigned                nr_user_files;
253
254         /* if used, fixed mapped user buffers */
255         unsigned                nr_user_bufs;
256         struct io_mapped_ubuf   *user_bufs;
257
258         struct user_struct      *user;
259
260         struct completion       ctx_done;
261
262         struct {
263                 struct mutex            uring_lock;
264                 wait_queue_head_t       wait;
265         } ____cacheline_aligned_in_smp;
266
267         struct {
268                 spinlock_t              completion_lock;
269                 bool                    poll_multi_file;
270                 /*
271                  * ->poll_list is protected by the ctx->uring_lock for
272                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
273                  * For SQPOLL, only the single threaded io_sq_thread() will
274                  * manipulate the list, hence no extra locking is needed there.
275                  */
276                 struct list_head        poll_list;
277                 struct list_head        cancel_list;
278         } ____cacheline_aligned_in_smp;
279
280         struct async_list       pending_async[2];
281
282 #if defined(CONFIG_UNIX)
283         struct socket           *ring_sock;
284 #endif
285 };
286
287 struct sqe_submit {
288         const struct io_uring_sqe       *sqe;
289         unsigned short                  index;
290         bool                            has_user;
291         bool                            needs_lock;
292         bool                            needs_fixed_file;
293 };
294
295 /*
296  * First field must be the file pointer in all the
297  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
298  */
299 struct io_poll_iocb {
300         struct file                     *file;
301         struct wait_queue_head          *head;
302         __poll_t                        events;
303         bool                            done;
304         bool                            canceled;
305         struct wait_queue_entry         wait;
306 };
307
308 /*
309  * NOTE! Each of the iocb union members has the file pointer
310  * as the first entry in their struct definition. So you can
311  * access the file pointer through any of the sub-structs,
312  * or directly as just 'ki_filp' in this struct.
313  */
314 struct io_kiocb {
315         union {
316                 struct file             *file;
317                 struct kiocb            rw;
318                 struct io_poll_iocb     poll;
319         };
320
321         struct sqe_submit       submit;
322
323         struct io_ring_ctx      *ctx;
324         struct list_head        list;
325         unsigned int            flags;
326         refcount_t              refs;
327 #define REQ_F_NOWAIT            1       /* must not punt to workers */
328 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
329 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
330 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
331 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
332 #define REQ_F_IO_DRAINED        32      /* drain done */
333         u64                     user_data;
334         u32                     error;  /* iopoll result from callback */
335         u32                     sequence;
336
337         struct work_struct      work;
338 };
339
340 #define IO_PLUG_THRESHOLD               2
341 #define IO_IOPOLL_BATCH                 8
342
343 struct io_submit_state {
344         struct blk_plug         plug;
345
346         /*
347          * io_kiocb alloc cache
348          */
349         void                    *reqs[IO_IOPOLL_BATCH];
350         unsigned                int free_reqs;
351         unsigned                int cur_req;
352
353         /*
354          * File reference cache
355          */
356         struct file             *file;
357         unsigned int            fd;
358         unsigned int            has_refs;
359         unsigned int            used_refs;
360         unsigned int            ios_left;
361 };
362
363 static void io_sq_wq_submit_work(struct work_struct *work);
364
365 static struct kmem_cache *req_cachep;
366
367 static const struct file_operations io_uring_fops;
368
369 struct sock *io_uring_get_socket(struct file *file)
370 {
371 #if defined(CONFIG_UNIX)
372         if (file->f_op == &io_uring_fops) {
373                 struct io_ring_ctx *ctx = file->private_data;
374
375                 return ctx->ring_sock->sk;
376         }
377 #endif
378         return NULL;
379 }
380 EXPORT_SYMBOL(io_uring_get_socket);
381
382 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
383 {
384         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
385
386         complete(&ctx->ctx_done);
387 }
388
389 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
390 {
391         struct io_ring_ctx *ctx;
392         int i;
393
394         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
395         if (!ctx)
396                 return NULL;
397
398         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
399                 kfree(ctx);
400                 return NULL;
401         }
402
403         ctx->flags = p->flags;
404         init_waitqueue_head(&ctx->cq_wait);
405         init_completion(&ctx->ctx_done);
406         mutex_init(&ctx->uring_lock);
407         init_waitqueue_head(&ctx->wait);
408         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
409                 spin_lock_init(&ctx->pending_async[i].lock);
410                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
411                 atomic_set(&ctx->pending_async[i].cnt, 0);
412         }
413         spin_lock_init(&ctx->completion_lock);
414         INIT_LIST_HEAD(&ctx->poll_list);
415         INIT_LIST_HEAD(&ctx->cancel_list);
416         INIT_LIST_HEAD(&ctx->defer_list);
417         return ctx;
418 }
419
420 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
421                                      struct io_kiocb *req)
422 {
423         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
424                 return false;
425
426         return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
427 }
428
429 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
430 {
431         struct io_kiocb *req;
432
433         if (list_empty(&ctx->defer_list))
434                 return NULL;
435
436         req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
437         if (!io_sequence_defer(ctx, req)) {
438                 list_del_init(&req->list);
439                 return req;
440         }
441
442         return NULL;
443 }
444
445 static void __io_commit_cqring(struct io_ring_ctx *ctx)
446 {
447         struct io_cq_ring *ring = ctx->cq_ring;
448
449         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
450                 /* order cqe stores with ring update */
451                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
452
453                 if (wq_has_sleeper(&ctx->cq_wait)) {
454                         wake_up_interruptible(&ctx->cq_wait);
455                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
456                 }
457         }
458 }
459
460 static void io_commit_cqring(struct io_ring_ctx *ctx)
461 {
462         struct io_kiocb *req;
463
464         __io_commit_cqring(ctx);
465
466         while ((req = io_get_deferred_req(ctx)) != NULL) {
467                 req->flags |= REQ_F_IO_DRAINED;
468                 queue_work(ctx->sqo_wq, &req->work);
469         }
470 }
471
472 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
473 {
474         struct io_cq_ring *ring = ctx->cq_ring;
475         unsigned tail;
476
477         tail = ctx->cached_cq_tail;
478         /*
479          * writes to the cq entry need to come after reading head; the
480          * control dependency is enough as we're using WRITE_ONCE to
481          * fill the cq entry
482          */
483         if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
484                 return NULL;
485
486         ctx->cached_cq_tail++;
487         return &ring->cqes[tail & ctx->cq_mask];
488 }
489
490 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
491                                  long res)
492 {
493         struct io_uring_cqe *cqe;
494
495         /*
496          * If we can't get a cq entry, userspace overflowed the
497          * submission (by quite a lot). Increment the overflow count in
498          * the ring.
499          */
500         cqe = io_get_cqring(ctx);
501         if (cqe) {
502                 WRITE_ONCE(cqe->user_data, ki_user_data);
503                 WRITE_ONCE(cqe->res, res);
504                 WRITE_ONCE(cqe->flags, 0);
505         } else {
506                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
507
508                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
509         }
510 }
511
512 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
513 {
514         if (waitqueue_active(&ctx->wait))
515                 wake_up(&ctx->wait);
516         if (waitqueue_active(&ctx->sqo_wait))
517                 wake_up(&ctx->sqo_wait);
518         if (ctx->cq_ev_fd)
519                 eventfd_signal(ctx->cq_ev_fd, 1);
520 }
521
522 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
523                                 long res)
524 {
525         unsigned long flags;
526
527         spin_lock_irqsave(&ctx->completion_lock, flags);
528         io_cqring_fill_event(ctx, user_data, res);
529         io_commit_cqring(ctx);
530         spin_unlock_irqrestore(&ctx->completion_lock, flags);
531
532         io_cqring_ev_posted(ctx);
533 }
534
535 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
536 {
537         percpu_ref_put_many(&ctx->refs, refs);
538
539         if (waitqueue_active(&ctx->wait))
540                 wake_up(&ctx->wait);
541 }
542
543 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
544                                    struct io_submit_state *state)
545 {
546         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
547         struct io_kiocb *req;
548
549         if (!percpu_ref_tryget(&ctx->refs))
550                 return NULL;
551
552         if (!state) {
553                 req = kmem_cache_alloc(req_cachep, gfp);
554                 if (unlikely(!req))
555                         goto out;
556         } else if (!state->free_reqs) {
557                 size_t sz;
558                 int ret;
559
560                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
561                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
562
563                 /*
564                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
565                  * retry single alloc to be on the safe side.
566                  */
567                 if (unlikely(ret <= 0)) {
568                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
569                         if (!state->reqs[0])
570                                 goto out;
571                         ret = 1;
572                 }
573                 state->free_reqs = ret - 1;
574                 state->cur_req = 1;
575                 req = state->reqs[0];
576         } else {
577                 req = state->reqs[state->cur_req];
578                 state->free_reqs--;
579                 state->cur_req++;
580         }
581
582         req->file = NULL;
583         req->ctx = ctx;
584         req->flags = 0;
585         /* one is dropped after submission, the other at completion */
586         refcount_set(&req->refs, 2);
587         return req;
588 out:
589         io_ring_drop_ctx_refs(ctx, 1);
590         return NULL;
591 }
592
593 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
594 {
595         if (*nr) {
596                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
597                 io_ring_drop_ctx_refs(ctx, *nr);
598                 *nr = 0;
599         }
600 }
601
602 static void io_free_req(struct io_kiocb *req)
603 {
604         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
605                 fput(req->file);
606         io_ring_drop_ctx_refs(req->ctx, 1);
607         kmem_cache_free(req_cachep, req);
608 }
609
610 static void io_put_req(struct io_kiocb *req)
611 {
612         if (refcount_dec_and_test(&req->refs))
613                 io_free_req(req);
614 }
615
616 /*
617  * Find and free completed poll iocbs
618  */
619 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
620                                struct list_head *done)
621 {
622         void *reqs[IO_IOPOLL_BATCH];
623         struct io_kiocb *req;
624         int to_free;
625
626         to_free = 0;
627         while (!list_empty(done)) {
628                 req = list_first_entry(done, struct io_kiocb, list);
629                 list_del(&req->list);
630
631                 io_cqring_fill_event(ctx, req->user_data, req->error);
632                 (*nr_events)++;
633
634                 if (refcount_dec_and_test(&req->refs)) {
635                         /* If we're not using fixed files, we have to pair the
636                          * completion part with the file put. Use regular
637                          * completions for those, only batch free for fixed
638                          * file.
639                          */
640                         if (req->flags & REQ_F_FIXED_FILE) {
641                                 reqs[to_free++] = req;
642                                 if (to_free == ARRAY_SIZE(reqs))
643                                         io_free_req_many(ctx, reqs, &to_free);
644                         } else {
645                                 io_free_req(req);
646                         }
647                 }
648         }
649
650         io_commit_cqring(ctx);
651         io_free_req_many(ctx, reqs, &to_free);
652 }
653
654 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
655                         long min)
656 {
657         struct io_kiocb *req, *tmp;
658         LIST_HEAD(done);
659         bool spin;
660         int ret;
661
662         /*
663          * Only spin for completions if we don't have multiple devices hanging
664          * off our complete list, and we're under the requested amount.
665          */
666         spin = !ctx->poll_multi_file && *nr_events < min;
667
668         ret = 0;
669         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
670                 struct kiocb *kiocb = &req->rw;
671
672                 /*
673                  * Move completed entries to our local list. If we find a
674                  * request that requires polling, break out and complete
675                  * the done list first, if we have entries there.
676                  */
677                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
678                         list_move_tail(&req->list, &done);
679                         continue;
680                 }
681                 if (!list_empty(&done))
682                         break;
683
684                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
685                 if (ret < 0)
686                         break;
687
688                 if (ret && spin)
689                         spin = false;
690                 ret = 0;
691         }
692
693         if (!list_empty(&done))
694                 io_iopoll_complete(ctx, nr_events, &done);
695
696         return ret;
697 }
698
699 /*
700  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
701  * non-spinning poll check - we'll still enter the driver poll loop, but only
702  * as a non-spinning completion check.
703  */
704 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
705                                 long min)
706 {
707         while (!list_empty(&ctx->poll_list)) {
708                 int ret;
709
710                 ret = io_do_iopoll(ctx, nr_events, min);
711                 if (ret < 0)
712                         return ret;
713                 if (!min || *nr_events >= min)
714                         return 0;
715         }
716
717         return 1;
718 }
719
720 /*
721  * We can't just wait for polled events to come to us, we have to actively
722  * find and complete them.
723  */
724 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
725 {
726         if (!(ctx->flags & IORING_SETUP_IOPOLL))
727                 return;
728
729         mutex_lock(&ctx->uring_lock);
730         while (!list_empty(&ctx->poll_list)) {
731                 unsigned int nr_events = 0;
732
733                 io_iopoll_getevents(ctx, &nr_events, 1);
734         }
735         mutex_unlock(&ctx->uring_lock);
736 }
737
738 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
739                            long min)
740 {
741         int ret = 0;
742
743         do {
744                 int tmin = 0;
745
746                 if (*nr_events < min)
747                         tmin = min - *nr_events;
748
749                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
750                 if (ret <= 0)
751                         break;
752                 ret = 0;
753         } while (min && !*nr_events && !need_resched());
754
755         return ret;
756 }
757
758 static void kiocb_end_write(struct kiocb *kiocb)
759 {
760         if (kiocb->ki_flags & IOCB_WRITE) {
761                 struct inode *inode = file_inode(kiocb->ki_filp);
762
763                 /*
764                  * Tell lockdep we inherited freeze protection from submission
765                  * thread.
766                  */
767                 if (S_ISREG(inode->i_mode))
768                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
769                 file_end_write(kiocb->ki_filp);
770         }
771 }
772
773 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
774 {
775         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
776
777         kiocb_end_write(kiocb);
778
779         io_cqring_add_event(req->ctx, req->user_data, res);
780         io_put_req(req);
781 }
782
783 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
784 {
785         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
786
787         kiocb_end_write(kiocb);
788
789         req->error = res;
790         if (res != -EAGAIN)
791                 req->flags |= REQ_F_IOPOLL_COMPLETED;
792 }
793
794 /*
795  * After the iocb has been issued, it's safe to be found on the poll list.
796  * Adding the kiocb to the list AFTER submission ensures that we don't
797  * find it from a io_iopoll_getevents() thread before the issuer is done
798  * accessing the kiocb cookie.
799  */
800 static void io_iopoll_req_issued(struct io_kiocb *req)
801 {
802         struct io_ring_ctx *ctx = req->ctx;
803
804         /*
805          * Track whether we have multiple files in our lists. This will impact
806          * how we do polling eventually, not spinning if we're on potentially
807          * different devices.
808          */
809         if (list_empty(&ctx->poll_list)) {
810                 ctx->poll_multi_file = false;
811         } else if (!ctx->poll_multi_file) {
812                 struct io_kiocb *list_req;
813
814                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
815                                                 list);
816                 if (list_req->rw.ki_filp != req->rw.ki_filp)
817                         ctx->poll_multi_file = true;
818         }
819
820         /*
821          * For fast devices, IO may have already completed. If it has, add
822          * it to the front so we find it first.
823          */
824         if (req->flags & REQ_F_IOPOLL_COMPLETED)
825                 list_add(&req->list, &ctx->poll_list);
826         else
827                 list_add_tail(&req->list, &ctx->poll_list);
828 }
829
830 static void io_file_put(struct io_submit_state *state)
831 {
832         if (state->file) {
833                 int diff = state->has_refs - state->used_refs;
834
835                 if (diff)
836                         fput_many(state->file, diff);
837                 state->file = NULL;
838         }
839 }
840
841 /*
842  * Get as many references to a file as we have IOs left in this submission,
843  * assuming most submissions are for one file, or at least that each file
844  * has more than one submission.
845  */
846 static struct file *io_file_get(struct io_submit_state *state, int fd)
847 {
848         if (!state)
849                 return fget(fd);
850
851         if (state->file) {
852                 if (state->fd == fd) {
853                         state->used_refs++;
854                         state->ios_left--;
855                         return state->file;
856                 }
857                 io_file_put(state);
858         }
859         state->file = fget_many(fd, state->ios_left);
860         if (!state->file)
861                 return NULL;
862
863         state->fd = fd;
864         state->has_refs = state->ios_left;
865         state->used_refs = 1;
866         state->ios_left--;
867         return state->file;
868 }
869
870 /*
871  * If we tracked the file through the SCM inflight mechanism, we could support
872  * any file. For now, just ensure that anything potentially problematic is done
873  * inline.
874  */
875 static bool io_file_supports_async(struct file *file)
876 {
877         umode_t mode = file_inode(file)->i_mode;
878
879         if (S_ISBLK(mode) || S_ISCHR(mode))
880                 return true;
881         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
882                 return true;
883
884         return false;
885 }
886
887 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
888                       bool force_nonblock)
889 {
890         const struct io_uring_sqe *sqe = s->sqe;
891         struct io_ring_ctx *ctx = req->ctx;
892         struct kiocb *kiocb = &req->rw;
893         unsigned ioprio;
894         int ret;
895
896         if (!req->file)
897                 return -EBADF;
898
899         if (force_nonblock && !io_file_supports_async(req->file))
900                 force_nonblock = false;
901
902         kiocb->ki_pos = READ_ONCE(sqe->off);
903         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
904         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
905
906         ioprio = READ_ONCE(sqe->ioprio);
907         if (ioprio) {
908                 ret = ioprio_check_cap(ioprio);
909                 if (ret)
910                         return ret;
911
912                 kiocb->ki_ioprio = ioprio;
913         } else
914                 kiocb->ki_ioprio = get_current_ioprio();
915
916         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
917         if (unlikely(ret))
918                 return ret;
919
920         /* don't allow async punt if RWF_NOWAIT was requested */
921         if (kiocb->ki_flags & IOCB_NOWAIT)
922                 req->flags |= REQ_F_NOWAIT;
923
924         if (force_nonblock)
925                 kiocb->ki_flags |= IOCB_NOWAIT;
926
927         if (ctx->flags & IORING_SETUP_IOPOLL) {
928                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
929                     !kiocb->ki_filp->f_op->iopoll)
930                         return -EOPNOTSUPP;
931
932                 req->error = 0;
933                 kiocb->ki_flags |= IOCB_HIPRI;
934                 kiocb->ki_complete = io_complete_rw_iopoll;
935         } else {
936                 if (kiocb->ki_flags & IOCB_HIPRI)
937                         return -EINVAL;
938                 kiocb->ki_complete = io_complete_rw;
939         }
940         return 0;
941 }
942
943 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
944 {
945         switch (ret) {
946         case -EIOCBQUEUED:
947                 break;
948         case -ERESTARTSYS:
949         case -ERESTARTNOINTR:
950         case -ERESTARTNOHAND:
951         case -ERESTART_RESTARTBLOCK:
952                 /*
953                  * We can't just restart the syscall, since previously
954                  * submitted sqes may already be in progress. Just fail this
955                  * IO with EINTR.
956                  */
957                 ret = -EINTR;
958                 /* fall through */
959         default:
960                 kiocb->ki_complete(kiocb, ret, 0);
961         }
962 }
963
964 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
965                            const struct io_uring_sqe *sqe,
966                            struct iov_iter *iter)
967 {
968         size_t len = READ_ONCE(sqe->len);
969         struct io_mapped_ubuf *imu;
970         unsigned index, buf_index;
971         size_t offset;
972         u64 buf_addr;
973
974         /* attempt to use fixed buffers without having provided iovecs */
975         if (unlikely(!ctx->user_bufs))
976                 return -EFAULT;
977
978         buf_index = READ_ONCE(sqe->buf_index);
979         if (unlikely(buf_index >= ctx->nr_user_bufs))
980                 return -EFAULT;
981
982         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
983         imu = &ctx->user_bufs[index];
984         buf_addr = READ_ONCE(sqe->addr);
985
986         /* overflow */
987         if (buf_addr + len < buf_addr)
988                 return -EFAULT;
989         /* not inside the mapped region */
990         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
991                 return -EFAULT;
992
993         /*
994          * May not be a start of buffer, set size appropriately
995          * and advance us to the beginning.
996          */
997         offset = buf_addr - imu->ubuf;
998         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
999         if (offset)
1000                 iov_iter_advance(iter, offset);
1001         return 0;
1002 }
1003
1004 static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
1005                            const struct sqe_submit *s, struct iovec **iovec,
1006                            struct iov_iter *iter)
1007 {
1008         const struct io_uring_sqe *sqe = s->sqe;
1009         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1010         size_t sqe_len = READ_ONCE(sqe->len);
1011         u8 opcode;
1012
1013         /*
1014          * We're reading ->opcode for the second time, but the first read
1015          * doesn't care whether it's _FIXED or not, so it doesn't matter
1016          * whether ->opcode changes concurrently. The first read does care
1017          * about whether it is a READ or a WRITE, so we don't trust this read
1018          * for that purpose and instead let the caller pass in the read/write
1019          * flag.
1020          */
1021         opcode = READ_ONCE(sqe->opcode);
1022         if (opcode == IORING_OP_READ_FIXED ||
1023             opcode == IORING_OP_WRITE_FIXED) {
1024                 int ret = io_import_fixed(ctx, rw, sqe, iter);
1025                 *iovec = NULL;
1026                 return ret;
1027         }
1028
1029         if (!s->has_user)
1030                 return -EFAULT;
1031
1032 #ifdef CONFIG_COMPAT
1033         if (ctx->compat)
1034                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1035                                                 iovec, iter);
1036 #endif
1037
1038         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1039 }
1040
1041 /*
1042  * Make a note of the last file/offset/direction we punted to async
1043  * context. We'll use this information to see if we can piggy back a
1044  * sequential request onto the previous one, if it's still hasn't been
1045  * completed by the async worker.
1046  */
1047 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1048 {
1049         struct async_list *async_list = &req->ctx->pending_async[rw];
1050         struct kiocb *kiocb = &req->rw;
1051         struct file *filp = kiocb->ki_filp;
1052         off_t io_end = kiocb->ki_pos + len;
1053
1054         if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1055                 unsigned long max_pages;
1056
1057                 /* Use 8x RA size as a decent limiter for both reads/writes */
1058                 max_pages = filp->f_ra.ra_pages;
1059                 if (!max_pages)
1060                         max_pages = VM_READAHEAD_PAGES;
1061                 max_pages *= 8;
1062
1063                 /* If max pages are exceeded, reset the state */
1064                 len >>= PAGE_SHIFT;
1065                 if (async_list->io_pages + len <= max_pages) {
1066                         req->flags |= REQ_F_SEQ_PREV;
1067                         async_list->io_pages += len;
1068                 } else {
1069                         io_end = 0;
1070                         async_list->io_pages = 0;
1071                 }
1072         }
1073
1074         /* New file? Reset state. */
1075         if (async_list->file != filp) {
1076                 async_list->io_pages = 0;
1077                 async_list->file = filp;
1078         }
1079         async_list->io_end = io_end;
1080 }
1081
1082 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1083                    bool force_nonblock)
1084 {
1085         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1086         struct kiocb *kiocb = &req->rw;
1087         struct iov_iter iter;
1088         struct file *file;
1089         size_t iov_count;
1090         int ret;
1091
1092         ret = io_prep_rw(req, s, force_nonblock);
1093         if (ret)
1094                 return ret;
1095         file = kiocb->ki_filp;
1096
1097         if (unlikely(!(file->f_mode & FMODE_READ)))
1098                 return -EBADF;
1099         if (unlikely(!file->f_op->read_iter))
1100                 return -EINVAL;
1101
1102         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1103         if (ret)
1104                 return ret;
1105
1106         iov_count = iov_iter_count(&iter);
1107         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1108         if (!ret) {
1109                 ssize_t ret2;
1110
1111                 /* Catch -EAGAIN return for forced non-blocking submission */
1112                 ret2 = call_read_iter(file, kiocb, &iter);
1113                 if (!force_nonblock || ret2 != -EAGAIN) {
1114                         io_rw_done(kiocb, ret2);
1115                 } else {
1116                         /*
1117                          * If ->needs_lock is true, we're already in async
1118                          * context.
1119                          */
1120                         if (!s->needs_lock)
1121                                 io_async_list_note(READ, req, iov_count);
1122                         ret = -EAGAIN;
1123                 }
1124         }
1125         kfree(iovec);
1126         return ret;
1127 }
1128
1129 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1130                     bool force_nonblock)
1131 {
1132         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1133         struct kiocb *kiocb = &req->rw;
1134         struct iov_iter iter;
1135         struct file *file;
1136         size_t iov_count;
1137         int ret;
1138
1139         ret = io_prep_rw(req, s, force_nonblock);
1140         if (ret)
1141                 return ret;
1142
1143         file = kiocb->ki_filp;
1144         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1145                 return -EBADF;
1146         if (unlikely(!file->f_op->write_iter))
1147                 return -EINVAL;
1148
1149         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1150         if (ret)
1151                 return ret;
1152
1153         iov_count = iov_iter_count(&iter);
1154
1155         ret = -EAGAIN;
1156         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1157                 /* If ->needs_lock is true, we're already in async context. */
1158                 if (!s->needs_lock)
1159                         io_async_list_note(WRITE, req, iov_count);
1160                 goto out_free;
1161         }
1162
1163         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1164         if (!ret) {
1165                 ssize_t ret2;
1166
1167                 /*
1168                  * Open-code file_start_write here to grab freeze protection,
1169                  * which will be released by another thread in
1170                  * io_complete_rw().  Fool lockdep by telling it the lock got
1171                  * released so that it doesn't complain about the held lock when
1172                  * we return to userspace.
1173                  */
1174                 if (S_ISREG(file_inode(file)->i_mode)) {
1175                         __sb_start_write(file_inode(file)->i_sb,
1176                                                 SB_FREEZE_WRITE, true);
1177                         __sb_writers_release(file_inode(file)->i_sb,
1178                                                 SB_FREEZE_WRITE);
1179                 }
1180                 kiocb->ki_flags |= IOCB_WRITE;
1181
1182                 ret2 = call_write_iter(file, kiocb, &iter);
1183                 if (!force_nonblock || ret2 != -EAGAIN) {
1184                         io_rw_done(kiocb, ret2);
1185                 } else {
1186                         /*
1187                          * If ->needs_lock is true, we're already in async
1188                          * context.
1189                          */
1190                         if (!s->needs_lock)
1191                                 io_async_list_note(WRITE, req, iov_count);
1192                         ret = -EAGAIN;
1193                 }
1194         }
1195 out_free:
1196         kfree(iovec);
1197         return ret;
1198 }
1199
1200 /*
1201  * IORING_OP_NOP just posts a completion event, nothing else.
1202  */
1203 static int io_nop(struct io_kiocb *req, u64 user_data)
1204 {
1205         struct io_ring_ctx *ctx = req->ctx;
1206         long err = 0;
1207
1208         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1209                 return -EINVAL;
1210
1211         io_cqring_add_event(ctx, user_data, err);
1212         io_put_req(req);
1213         return 0;
1214 }
1215
1216 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1217 {
1218         struct io_ring_ctx *ctx = req->ctx;
1219
1220         if (!req->file)
1221                 return -EBADF;
1222
1223         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1224                 return -EINVAL;
1225         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1226                 return -EINVAL;
1227
1228         return 0;
1229 }
1230
1231 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1232                     bool force_nonblock)
1233 {
1234         loff_t sqe_off = READ_ONCE(sqe->off);
1235         loff_t sqe_len = READ_ONCE(sqe->len);
1236         loff_t end = sqe_off + sqe_len;
1237         unsigned fsync_flags;
1238         int ret;
1239
1240         fsync_flags = READ_ONCE(sqe->fsync_flags);
1241         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1242                 return -EINVAL;
1243
1244         ret = io_prep_fsync(req, sqe);
1245         if (ret)
1246                 return ret;
1247
1248         /* fsync always requires a blocking context */
1249         if (force_nonblock)
1250                 return -EAGAIN;
1251
1252         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1253                                 end > 0 ? end : LLONG_MAX,
1254                                 fsync_flags & IORING_FSYNC_DATASYNC);
1255
1256         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1257         io_put_req(req);
1258         return 0;
1259 }
1260
1261 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1262 {
1263         struct io_ring_ctx *ctx = req->ctx;
1264         int ret = 0;
1265
1266         if (!req->file)
1267                 return -EBADF;
1268
1269         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1270                 return -EINVAL;
1271         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1272                 return -EINVAL;
1273
1274         return ret;
1275 }
1276
1277 static int io_sync_file_range(struct io_kiocb *req,
1278                               const struct io_uring_sqe *sqe,
1279                               bool force_nonblock)
1280 {
1281         loff_t sqe_off;
1282         loff_t sqe_len;
1283         unsigned flags;
1284         int ret;
1285
1286         ret = io_prep_sfr(req, sqe);
1287         if (ret)
1288                 return ret;
1289
1290         /* sync_file_range always requires a blocking context */
1291         if (force_nonblock)
1292                 return -EAGAIN;
1293
1294         sqe_off = READ_ONCE(sqe->off);
1295         sqe_len = READ_ONCE(sqe->len);
1296         flags = READ_ONCE(sqe->sync_range_flags);
1297
1298         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1299
1300         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1301         io_put_req(req);
1302         return 0;
1303 }
1304
1305 static void io_poll_remove_one(struct io_kiocb *req)
1306 {
1307         struct io_poll_iocb *poll = &req->poll;
1308
1309         spin_lock(&poll->head->lock);
1310         WRITE_ONCE(poll->canceled, true);
1311         if (!list_empty(&poll->wait.entry)) {
1312                 list_del_init(&poll->wait.entry);
1313                 queue_work(req->ctx->sqo_wq, &req->work);
1314         }
1315         spin_unlock(&poll->head->lock);
1316
1317         list_del_init(&req->list);
1318 }
1319
1320 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1321 {
1322         struct io_kiocb *req;
1323
1324         spin_lock_irq(&ctx->completion_lock);
1325         while (!list_empty(&ctx->cancel_list)) {
1326                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1327                 io_poll_remove_one(req);
1328         }
1329         spin_unlock_irq(&ctx->completion_lock);
1330 }
1331
1332 /*
1333  * Find a running poll command that matches one specified in sqe->addr,
1334  * and remove it if found.
1335  */
1336 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1337 {
1338         struct io_ring_ctx *ctx = req->ctx;
1339         struct io_kiocb *poll_req, *next;
1340         int ret = -ENOENT;
1341
1342         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1343                 return -EINVAL;
1344         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1345             sqe->poll_events)
1346                 return -EINVAL;
1347
1348         spin_lock_irq(&ctx->completion_lock);
1349         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1350                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1351                         io_poll_remove_one(poll_req);
1352                         ret = 0;
1353                         break;
1354                 }
1355         }
1356         spin_unlock_irq(&ctx->completion_lock);
1357
1358         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1359         io_put_req(req);
1360         return 0;
1361 }
1362
1363 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1364                              __poll_t mask)
1365 {
1366         req->poll.done = true;
1367         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
1368         io_commit_cqring(ctx);
1369 }
1370
1371 static void io_poll_complete_work(struct work_struct *work)
1372 {
1373         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1374         struct io_poll_iocb *poll = &req->poll;
1375         struct poll_table_struct pt = { ._key = poll->events };
1376         struct io_ring_ctx *ctx = req->ctx;
1377         __poll_t mask = 0;
1378
1379         if (!READ_ONCE(poll->canceled))
1380                 mask = vfs_poll(poll->file, &pt) & poll->events;
1381
1382         /*
1383          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1384          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1385          * synchronize with them.  In the cancellation case the list_del_init
1386          * itself is not actually needed, but harmless so we keep it in to
1387          * avoid further branches in the fast path.
1388          */
1389         spin_lock_irq(&ctx->completion_lock);
1390         if (!mask && !READ_ONCE(poll->canceled)) {
1391                 add_wait_queue(poll->head, &poll->wait);
1392                 spin_unlock_irq(&ctx->completion_lock);
1393                 return;
1394         }
1395         list_del_init(&req->list);
1396         io_poll_complete(ctx, req, mask);
1397         spin_unlock_irq(&ctx->completion_lock);
1398
1399         io_cqring_ev_posted(ctx);
1400         io_put_req(req);
1401 }
1402
1403 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1404                         void *key)
1405 {
1406         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1407                                                         wait);
1408         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1409         struct io_ring_ctx *ctx = req->ctx;
1410         __poll_t mask = key_to_poll(key);
1411         unsigned long flags;
1412
1413         /* for instances that support it check for an event match first: */
1414         if (mask && !(mask & poll->events))
1415                 return 0;
1416
1417         list_del_init(&poll->wait.entry);
1418
1419         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1420                 list_del(&req->list);
1421                 io_poll_complete(ctx, req, mask);
1422                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1423
1424                 io_cqring_ev_posted(ctx);
1425                 io_put_req(req);
1426         } else {
1427                 queue_work(ctx->sqo_wq, &req->work);
1428         }
1429
1430         return 1;
1431 }
1432
1433 struct io_poll_table {
1434         struct poll_table_struct pt;
1435         struct io_kiocb *req;
1436         int error;
1437 };
1438
1439 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1440                                struct poll_table_struct *p)
1441 {
1442         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1443
1444         if (unlikely(pt->req->poll.head)) {
1445                 pt->error = -EINVAL;
1446                 return;
1447         }
1448
1449         pt->error = 0;
1450         pt->req->poll.head = head;
1451         add_wait_queue(head, &pt->req->poll.wait);
1452 }
1453
1454 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1455 {
1456         struct io_poll_iocb *poll = &req->poll;
1457         struct io_ring_ctx *ctx = req->ctx;
1458         struct io_poll_table ipt;
1459         bool cancel = false;
1460         __poll_t mask;
1461         u16 events;
1462
1463         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1464                 return -EINVAL;
1465         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1466                 return -EINVAL;
1467         if (!poll->file)
1468                 return -EBADF;
1469
1470         INIT_WORK(&req->work, io_poll_complete_work);
1471         events = READ_ONCE(sqe->poll_events);
1472         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1473
1474         poll->head = NULL;
1475         poll->done = false;
1476         poll->canceled = false;
1477
1478         ipt.pt._qproc = io_poll_queue_proc;
1479         ipt.pt._key = poll->events;
1480         ipt.req = req;
1481         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1482
1483         /* initialized the list so that we can do list_empty checks */
1484         INIT_LIST_HEAD(&poll->wait.entry);
1485         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1486
1487         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1488
1489         spin_lock_irq(&ctx->completion_lock);
1490         if (likely(poll->head)) {
1491                 spin_lock(&poll->head->lock);
1492                 if (unlikely(list_empty(&poll->wait.entry))) {
1493                         if (ipt.error)
1494                                 cancel = true;
1495                         ipt.error = 0;
1496                         mask = 0;
1497                 }
1498                 if (mask || ipt.error)
1499                         list_del_init(&poll->wait.entry);
1500                 else if (cancel)
1501                         WRITE_ONCE(poll->canceled, true);
1502                 else if (!poll->done) /* actually waiting for an event */
1503                         list_add_tail(&req->list, &ctx->cancel_list);
1504                 spin_unlock(&poll->head->lock);
1505         }
1506         if (mask) { /* no async, we'd stolen it */
1507                 ipt.error = 0;
1508                 io_poll_complete(ctx, req, mask);
1509         }
1510         spin_unlock_irq(&ctx->completion_lock);
1511
1512         if (mask) {
1513                 io_cqring_ev_posted(ctx);
1514                 io_put_req(req);
1515         }
1516         return ipt.error;
1517 }
1518
1519 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
1520                         const struct io_uring_sqe *sqe)
1521 {
1522         struct io_uring_sqe *sqe_copy;
1523
1524         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
1525                 return 0;
1526
1527         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1528         if (!sqe_copy)
1529                 return -EAGAIN;
1530
1531         spin_lock_irq(&ctx->completion_lock);
1532         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
1533                 spin_unlock_irq(&ctx->completion_lock);
1534                 kfree(sqe_copy);
1535                 return 0;
1536         }
1537
1538         memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
1539         req->submit.sqe = sqe_copy;
1540
1541         INIT_WORK(&req->work, io_sq_wq_submit_work);
1542         list_add_tail(&req->list, &ctx->defer_list);
1543         spin_unlock_irq(&ctx->completion_lock);
1544         return -EIOCBQUEUED;
1545 }
1546
1547 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1548                            const struct sqe_submit *s, bool force_nonblock)
1549 {
1550         int ret, opcode;
1551
1552         if (unlikely(s->index >= ctx->sq_entries))
1553                 return -EINVAL;
1554         req->user_data = READ_ONCE(s->sqe->user_data);
1555
1556         opcode = READ_ONCE(s->sqe->opcode);
1557         switch (opcode) {
1558         case IORING_OP_NOP:
1559                 ret = io_nop(req, req->user_data);
1560                 break;
1561         case IORING_OP_READV:
1562                 if (unlikely(s->sqe->buf_index))
1563                         return -EINVAL;
1564                 ret = io_read(req, s, force_nonblock);
1565                 break;
1566         case IORING_OP_WRITEV:
1567                 if (unlikely(s->sqe->buf_index))
1568                         return -EINVAL;
1569                 ret = io_write(req, s, force_nonblock);
1570                 break;
1571         case IORING_OP_READ_FIXED:
1572                 ret = io_read(req, s, force_nonblock);
1573                 break;
1574         case IORING_OP_WRITE_FIXED:
1575                 ret = io_write(req, s, force_nonblock);
1576                 break;
1577         case IORING_OP_FSYNC:
1578                 ret = io_fsync(req, s->sqe, force_nonblock);
1579                 break;
1580         case IORING_OP_POLL_ADD:
1581                 ret = io_poll_add(req, s->sqe);
1582                 break;
1583         case IORING_OP_POLL_REMOVE:
1584                 ret = io_poll_remove(req, s->sqe);
1585                 break;
1586         case IORING_OP_SYNC_FILE_RANGE:
1587                 ret = io_sync_file_range(req, s->sqe, force_nonblock);
1588                 break;
1589         default:
1590                 ret = -EINVAL;
1591                 break;
1592         }
1593
1594         if (ret)
1595                 return ret;
1596
1597         if (ctx->flags & IORING_SETUP_IOPOLL) {
1598                 if (req->error == -EAGAIN)
1599                         return -EAGAIN;
1600
1601                 /* workqueue context doesn't hold uring_lock, grab it now */
1602                 if (s->needs_lock)
1603                         mutex_lock(&ctx->uring_lock);
1604                 io_iopoll_req_issued(req);
1605                 if (s->needs_lock)
1606                         mutex_unlock(&ctx->uring_lock);
1607         }
1608
1609         return 0;
1610 }
1611
1612 static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1613                                                  const struct io_uring_sqe *sqe)
1614 {
1615         switch (sqe->opcode) {
1616         case IORING_OP_READV:
1617         case IORING_OP_READ_FIXED:
1618                 return &ctx->pending_async[READ];
1619         case IORING_OP_WRITEV:
1620         case IORING_OP_WRITE_FIXED:
1621                 return &ctx->pending_async[WRITE];
1622         default:
1623                 return NULL;
1624         }
1625 }
1626
1627 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1628 {
1629         u8 opcode = READ_ONCE(sqe->opcode);
1630
1631         return !(opcode == IORING_OP_READ_FIXED ||
1632                  opcode == IORING_OP_WRITE_FIXED);
1633 }
1634
1635 static void io_sq_wq_submit_work(struct work_struct *work)
1636 {
1637         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1638         struct io_ring_ctx *ctx = req->ctx;
1639         struct mm_struct *cur_mm = NULL;
1640         struct async_list *async_list;
1641         LIST_HEAD(req_list);
1642         mm_segment_t old_fs;
1643         int ret;
1644
1645         async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1646 restart:
1647         do {
1648                 struct sqe_submit *s = &req->submit;
1649                 const struct io_uring_sqe *sqe = s->sqe;
1650
1651                 /* Ensure we clear previously set non-block flag */
1652                 req->rw.ki_flags &= ~IOCB_NOWAIT;
1653
1654                 ret = 0;
1655                 if (io_sqe_needs_user(sqe) && !cur_mm) {
1656                         if (!mmget_not_zero(ctx->sqo_mm)) {
1657                                 ret = -EFAULT;
1658                         } else {
1659                                 cur_mm = ctx->sqo_mm;
1660                                 use_mm(cur_mm);
1661                                 old_fs = get_fs();
1662                                 set_fs(USER_DS);
1663                         }
1664                 }
1665
1666                 if (!ret) {
1667                         s->has_user = cur_mm != NULL;
1668                         s->needs_lock = true;
1669                         do {
1670                                 ret = __io_submit_sqe(ctx, req, s, false);
1671                                 /*
1672                                  * We can get EAGAIN for polled IO even though
1673                                  * we're forcing a sync submission from here,
1674                                  * since we can't wait for request slots on the
1675                                  * block side.
1676                                  */
1677                                 if (ret != -EAGAIN)
1678                                         break;
1679                                 cond_resched();
1680                         } while (1);
1681                 }
1682
1683                 /* drop submission reference */
1684                 io_put_req(req);
1685
1686                 if (ret) {
1687                         io_cqring_add_event(ctx, sqe->user_data, ret);
1688                         io_put_req(req);
1689                 }
1690
1691                 /* async context always use a copy of the sqe */
1692                 kfree(sqe);
1693
1694                 if (!async_list)
1695                         break;
1696                 if (!list_empty(&req_list)) {
1697                         req = list_first_entry(&req_list, struct io_kiocb,
1698                                                 list);
1699                         list_del(&req->list);
1700                         continue;
1701                 }
1702                 if (list_empty(&async_list->list))
1703                         break;
1704
1705                 req = NULL;
1706                 spin_lock(&async_list->lock);
1707                 if (list_empty(&async_list->list)) {
1708                         spin_unlock(&async_list->lock);
1709                         break;
1710                 }
1711                 list_splice_init(&async_list->list, &req_list);
1712                 spin_unlock(&async_list->lock);
1713
1714                 req = list_first_entry(&req_list, struct io_kiocb, list);
1715                 list_del(&req->list);
1716         } while (req);
1717
1718         /*
1719          * Rare case of racing with a submitter. If we find the count has
1720          * dropped to zero AND we have pending work items, then restart
1721          * the processing. This is a tiny race window.
1722          */
1723         if (async_list) {
1724                 ret = atomic_dec_return(&async_list->cnt);
1725                 while (!ret && !list_empty(&async_list->list)) {
1726                         spin_lock(&async_list->lock);
1727                         atomic_inc(&async_list->cnt);
1728                         list_splice_init(&async_list->list, &req_list);
1729                         spin_unlock(&async_list->lock);
1730
1731                         if (!list_empty(&req_list)) {
1732                                 req = list_first_entry(&req_list,
1733                                                         struct io_kiocb, list);
1734                                 list_del(&req->list);
1735                                 goto restart;
1736                         }
1737                         ret = atomic_dec_return(&async_list->cnt);
1738                 }
1739         }
1740
1741         if (cur_mm) {
1742                 set_fs(old_fs);
1743                 unuse_mm(cur_mm);
1744                 mmput(cur_mm);
1745         }
1746 }
1747
1748 /*
1749  * See if we can piggy back onto previously submitted work, that is still
1750  * running. We currently only allow this if the new request is sequential
1751  * to the previous one we punted.
1752  */
1753 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1754 {
1755         bool ret = false;
1756
1757         if (!list)
1758                 return false;
1759         if (!(req->flags & REQ_F_SEQ_PREV))
1760                 return false;
1761         if (!atomic_read(&list->cnt))
1762                 return false;
1763
1764         ret = true;
1765         spin_lock(&list->lock);
1766         list_add_tail(&req->list, &list->list);
1767         if (!atomic_read(&list->cnt)) {
1768                 list_del_init(&req->list);
1769                 ret = false;
1770         }
1771         spin_unlock(&list->lock);
1772         return ret;
1773 }
1774
1775 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1776 {
1777         int op = READ_ONCE(sqe->opcode);
1778
1779         switch (op) {
1780         case IORING_OP_NOP:
1781         case IORING_OP_POLL_REMOVE:
1782                 return false;
1783         default:
1784                 return true;
1785         }
1786 }
1787
1788 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1789                            struct io_submit_state *state, struct io_kiocb *req)
1790 {
1791         unsigned flags;
1792         int fd;
1793
1794         flags = READ_ONCE(s->sqe->flags);
1795         fd = READ_ONCE(s->sqe->fd);
1796
1797         if (flags & IOSQE_IO_DRAIN) {
1798                 req->flags |= REQ_F_IO_DRAIN;
1799                 req->sequence = ctx->cached_sq_head - 1;
1800         }
1801
1802         if (!io_op_needs_file(s->sqe))
1803                 return 0;
1804
1805         if (flags & IOSQE_FIXED_FILE) {
1806                 if (unlikely(!ctx->user_files ||
1807                     (unsigned) fd >= ctx->nr_user_files))
1808                         return -EBADF;
1809                 req->file = ctx->user_files[fd];
1810                 req->flags |= REQ_F_FIXED_FILE;
1811         } else {
1812                 if (s->needs_fixed_file)
1813                         return -EBADF;
1814                 req->file = io_file_get(state, fd);
1815                 if (unlikely(!req->file))
1816                         return -EBADF;
1817         }
1818
1819         return 0;
1820 }
1821
1822 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1823                          struct io_submit_state *state)
1824 {
1825         struct io_kiocb *req;
1826         int ret;
1827
1828         /* enforce forwards compatibility on users */
1829         if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
1830                 return -EINVAL;
1831
1832         req = io_get_req(ctx, state);
1833         if (unlikely(!req))
1834                 return -EAGAIN;
1835
1836         ret = io_req_set_file(ctx, s, state, req);
1837         if (unlikely(ret))
1838                 goto out;
1839
1840         ret = io_req_defer(ctx, req, s->sqe);
1841         if (ret) {
1842                 if (ret == -EIOCBQUEUED)
1843                         ret = 0;
1844                 return ret;
1845         }
1846
1847         ret = __io_submit_sqe(ctx, req, s, true);
1848         if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1849                 struct io_uring_sqe *sqe_copy;
1850
1851                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1852                 if (sqe_copy) {
1853                         struct async_list *list;
1854
1855                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1856                         s->sqe = sqe_copy;
1857
1858                         memcpy(&req->submit, s, sizeof(*s));
1859                         list = io_async_list_from_sqe(ctx, s->sqe);
1860                         if (!io_add_to_prev_work(list, req)) {
1861                                 if (list)
1862                                         atomic_inc(&list->cnt);
1863                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
1864                                 queue_work(ctx->sqo_wq, &req->work);
1865                         }
1866
1867                         /*
1868                          * Queued up for async execution, worker will release
1869                          * submit reference when the iocb is actually
1870                          * submitted.
1871                          */
1872                         return 0;
1873                 }
1874         }
1875
1876 out:
1877         /* drop submission reference */
1878         io_put_req(req);
1879
1880         /* and drop final reference, if we failed */
1881         if (ret)
1882                 io_put_req(req);
1883
1884         return ret;
1885 }
1886
1887 /*
1888  * Batched submission is done, ensure local IO is flushed out.
1889  */
1890 static void io_submit_state_end(struct io_submit_state *state)
1891 {
1892         blk_finish_plug(&state->plug);
1893         io_file_put(state);
1894         if (state->free_reqs)
1895                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1896                                         &state->reqs[state->cur_req]);
1897 }
1898
1899 /*
1900  * Start submission side cache.
1901  */
1902 static void io_submit_state_start(struct io_submit_state *state,
1903                                   struct io_ring_ctx *ctx, unsigned max_ios)
1904 {
1905         blk_start_plug(&state->plug);
1906         state->free_reqs = 0;
1907         state->file = NULL;
1908         state->ios_left = max_ios;
1909 }
1910
1911 static void io_commit_sqring(struct io_ring_ctx *ctx)
1912 {
1913         struct io_sq_ring *ring = ctx->sq_ring;
1914
1915         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1916                 /*
1917                  * Ensure any loads from the SQEs are done at this point,
1918                  * since once we write the new head, the application could
1919                  * write new data to them.
1920                  */
1921                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1922         }
1923 }
1924
1925 /*
1926  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1927  * that is mapped by userspace. This means that care needs to be taken to
1928  * ensure that reads are stable, as we cannot rely on userspace always
1929  * being a good citizen. If members of the sqe are validated and then later
1930  * used, it's important that those reads are done through READ_ONCE() to
1931  * prevent a re-load down the line.
1932  */
1933 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1934 {
1935         struct io_sq_ring *ring = ctx->sq_ring;
1936         unsigned head;
1937
1938         /*
1939          * The cached sq head (or cq tail) serves two purposes:
1940          *
1941          * 1) allows us to batch the cost of updating the user visible
1942          *    head updates.
1943          * 2) allows the kernel side to track the head on its own, even
1944          *    though the application is the one updating it.
1945          */
1946         head = ctx->cached_sq_head;
1947         /* make sure SQ entry isn't read before tail */
1948         if (head == smp_load_acquire(&ring->r.tail))
1949                 return false;
1950
1951         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1952         if (head < ctx->sq_entries) {
1953                 s->index = head;
1954                 s->sqe = &ctx->sq_sqes[head];
1955                 ctx->cached_sq_head++;
1956                 return true;
1957         }
1958
1959         /* drop invalid entries */
1960         ctx->cached_sq_head++;
1961         ring->dropped++;
1962         return false;
1963 }
1964
1965 static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1966                           unsigned int nr, bool has_user, bool mm_fault)
1967 {
1968         struct io_submit_state state, *statep = NULL;
1969         int ret, i, submitted = 0;
1970
1971         if (nr > IO_PLUG_THRESHOLD) {
1972                 io_submit_state_start(&state, ctx, nr);
1973                 statep = &state;
1974         }
1975
1976         for (i = 0; i < nr; i++) {
1977                 if (unlikely(mm_fault)) {
1978                         ret = -EFAULT;
1979                 } else {
1980                         sqes[i].has_user = has_user;
1981                         sqes[i].needs_lock = true;
1982                         sqes[i].needs_fixed_file = true;
1983                         ret = io_submit_sqe(ctx, &sqes[i], statep);
1984                 }
1985                 if (!ret) {
1986                         submitted++;
1987                         continue;
1988                 }
1989
1990                 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret);
1991         }
1992
1993         if (statep)
1994                 io_submit_state_end(&state);
1995
1996         return submitted;
1997 }
1998
1999 static int io_sq_thread(void *data)
2000 {
2001         struct sqe_submit sqes[IO_IOPOLL_BATCH];
2002         struct io_ring_ctx *ctx = data;
2003         struct mm_struct *cur_mm = NULL;
2004         mm_segment_t old_fs;
2005         DEFINE_WAIT(wait);
2006         unsigned inflight;
2007         unsigned long timeout;
2008
2009         old_fs = get_fs();
2010         set_fs(USER_DS);
2011
2012         timeout = inflight = 0;
2013         while (!kthread_should_park()) {
2014                 bool all_fixed, mm_fault = false;
2015                 int i;
2016
2017                 if (inflight) {
2018                         unsigned nr_events = 0;
2019
2020                         if (ctx->flags & IORING_SETUP_IOPOLL) {
2021                                 /*
2022                                  * We disallow the app entering submit/complete
2023                                  * with polling, but we still need to lock the
2024                                  * ring to prevent racing with polled issue
2025                                  * that got punted to a workqueue.
2026                                  */
2027                                 mutex_lock(&ctx->uring_lock);
2028                                 io_iopoll_check(ctx, &nr_events, 0);
2029                                 mutex_unlock(&ctx->uring_lock);
2030                         } else {
2031                                 /*
2032                                  * Normal IO, just pretend everything completed.
2033                                  * We don't have to poll completions for that.
2034                                  */
2035                                 nr_events = inflight;
2036                         }
2037
2038                         inflight -= nr_events;
2039                         if (!inflight)
2040                                 timeout = jiffies + ctx->sq_thread_idle;
2041                 }
2042
2043                 if (!io_get_sqring(ctx, &sqes[0])) {
2044                         /*
2045                          * We're polling. If we're within the defined idle
2046                          * period, then let us spin without work before going
2047                          * to sleep.
2048                          */
2049                         if (inflight || !time_after(jiffies, timeout)) {
2050                                 cpu_relax();
2051                                 continue;
2052                         }
2053
2054                         /*
2055                          * Drop cur_mm before scheduling, we can't hold it for
2056                          * long periods (or over schedule()). Do this before
2057                          * adding ourselves to the waitqueue, as the unuse/drop
2058                          * may sleep.
2059                          */
2060                         if (cur_mm) {
2061                                 unuse_mm(cur_mm);
2062                                 mmput(cur_mm);
2063                                 cur_mm = NULL;
2064                         }
2065
2066                         prepare_to_wait(&ctx->sqo_wait, &wait,
2067                                                 TASK_INTERRUPTIBLE);
2068
2069                         /* Tell userspace we may need a wakeup call */
2070                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
2071                         /* make sure to read SQ tail after writing flags */
2072                         smp_mb();
2073
2074                         if (!io_get_sqring(ctx, &sqes[0])) {
2075                                 if (kthread_should_park()) {
2076                                         finish_wait(&ctx->sqo_wait, &wait);
2077                                         break;
2078                                 }
2079                                 if (signal_pending(current))
2080                                         flush_signals(current);
2081                                 schedule();
2082                                 finish_wait(&ctx->sqo_wait, &wait);
2083
2084                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2085                                 continue;
2086                         }
2087                         finish_wait(&ctx->sqo_wait, &wait);
2088
2089                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2090                 }
2091
2092                 i = 0;
2093                 all_fixed = true;
2094                 do {
2095                         if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
2096                                 all_fixed = false;
2097
2098                         i++;
2099                         if (i == ARRAY_SIZE(sqes))
2100                                 break;
2101                 } while (io_get_sqring(ctx, &sqes[i]));
2102
2103                 /* Unless all new commands are FIXED regions, grab mm */
2104                 if (!all_fixed && !cur_mm) {
2105                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
2106                         if (!mm_fault) {
2107                                 use_mm(ctx->sqo_mm);
2108                                 cur_mm = ctx->sqo_mm;
2109                         }
2110                 }
2111
2112                 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
2113                                                 mm_fault);
2114
2115                 /* Commit SQ ring head once we've consumed all SQEs */
2116                 io_commit_sqring(ctx);
2117         }
2118
2119         set_fs(old_fs);
2120         if (cur_mm) {
2121                 unuse_mm(cur_mm);
2122                 mmput(cur_mm);
2123         }
2124
2125         kthread_parkme();
2126
2127         return 0;
2128 }
2129
2130 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2131 {
2132         struct io_submit_state state, *statep = NULL;
2133         int i, submit = 0;
2134
2135         if (to_submit > IO_PLUG_THRESHOLD) {
2136                 io_submit_state_start(&state, ctx, to_submit);
2137                 statep = &state;
2138         }
2139
2140         for (i = 0; i < to_submit; i++) {
2141                 struct sqe_submit s;
2142                 int ret;
2143
2144                 if (!io_get_sqring(ctx, &s))
2145                         break;
2146
2147                 s.has_user = true;
2148                 s.needs_lock = false;
2149                 s.needs_fixed_file = false;
2150                 submit++;
2151
2152                 ret = io_submit_sqe(ctx, &s, statep);
2153                 if (ret)
2154                         io_cqring_add_event(ctx, s.sqe->user_data, ret);
2155         }
2156         io_commit_sqring(ctx);
2157
2158         if (statep)
2159                 io_submit_state_end(statep);
2160
2161         return submit;
2162 }
2163
2164 static unsigned io_cqring_events(struct io_cq_ring *ring)
2165 {
2166         /* See comment at the top of this file */
2167         smp_rmb();
2168         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2169 }
2170
2171 /*
2172  * Wait until events become available, if we don't already have some. The
2173  * application must reap them itself, as they reside on the shared cq ring.
2174  */
2175 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2176                           const sigset_t __user *sig, size_t sigsz)
2177 {
2178         struct io_cq_ring *ring = ctx->cq_ring;
2179         sigset_t ksigmask, sigsaved;
2180         int ret;
2181
2182         if (io_cqring_events(ring) >= min_events)
2183                 return 0;
2184
2185         if (sig) {
2186 #ifdef CONFIG_COMPAT
2187                 if (in_compat_syscall())
2188                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2189                                                       &ksigmask, &sigsaved, sigsz);
2190                 else
2191 #endif
2192                         ret = set_user_sigmask(sig, &ksigmask,
2193                                                &sigsaved, sigsz);
2194
2195                 if (ret)
2196                         return ret;
2197         }
2198
2199         ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events);
2200
2201         if (sig)
2202                 restore_user_sigmask(sig, &sigsaved, ret == -ERESTARTSYS);
2203
2204         if (ret == -ERESTARTSYS)
2205                 ret = -EINTR;
2206
2207         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2208 }
2209
2210 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2211 {
2212 #if defined(CONFIG_UNIX)
2213         if (ctx->ring_sock) {
2214                 struct sock *sock = ctx->ring_sock->sk;
2215                 struct sk_buff *skb;
2216
2217                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2218                         kfree_skb(skb);
2219         }
2220 #else
2221         int i;
2222
2223         for (i = 0; i < ctx->nr_user_files; i++)
2224                 fput(ctx->user_files[i]);
2225 #endif
2226 }
2227
2228 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2229 {
2230         if (!ctx->user_files)
2231                 return -ENXIO;
2232
2233         __io_sqe_files_unregister(ctx);
2234         kfree(ctx->user_files);
2235         ctx->user_files = NULL;
2236         ctx->nr_user_files = 0;
2237         return 0;
2238 }
2239
2240 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2241 {
2242         if (ctx->sqo_thread) {
2243                 /*
2244                  * The park is a bit of a work-around, without it we get
2245                  * warning spews on shutdown with SQPOLL set and affinity
2246                  * set to a single CPU.
2247                  */
2248                 kthread_park(ctx->sqo_thread);
2249                 kthread_stop(ctx->sqo_thread);
2250                 ctx->sqo_thread = NULL;
2251         }
2252 }
2253
2254 static void io_finish_async(struct io_ring_ctx *ctx)
2255 {
2256         io_sq_thread_stop(ctx);
2257
2258         if (ctx->sqo_wq) {
2259                 destroy_workqueue(ctx->sqo_wq);
2260                 ctx->sqo_wq = NULL;
2261         }
2262 }
2263
2264 #if defined(CONFIG_UNIX)
2265 static void io_destruct_skb(struct sk_buff *skb)
2266 {
2267         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2268
2269         io_finish_async(ctx);
2270         unix_destruct_scm(skb);
2271 }
2272
2273 /*
2274  * Ensure the UNIX gc is aware of our file set, so we are certain that
2275  * the io_uring can be safely unregistered on process exit, even if we have
2276  * loops in the file referencing.
2277  */
2278 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2279 {
2280         struct sock *sk = ctx->ring_sock->sk;
2281         struct scm_fp_list *fpl;
2282         struct sk_buff *skb;
2283         int i;
2284
2285         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2286                 unsigned long inflight = ctx->user->unix_inflight + nr;
2287
2288                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2289                         return -EMFILE;
2290         }
2291
2292         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2293         if (!fpl)
2294                 return -ENOMEM;
2295
2296         skb = alloc_skb(0, GFP_KERNEL);
2297         if (!skb) {
2298                 kfree(fpl);
2299                 return -ENOMEM;
2300         }
2301
2302         skb->sk = sk;
2303         skb->destructor = io_destruct_skb;
2304
2305         fpl->user = get_uid(ctx->user);
2306         for (i = 0; i < nr; i++) {
2307                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2308                 unix_inflight(fpl->user, fpl->fp[i]);
2309         }
2310
2311         fpl->max = fpl->count = nr;
2312         UNIXCB(skb).fp = fpl;
2313         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2314         skb_queue_head(&sk->sk_receive_queue, skb);
2315
2316         for (i = 0; i < nr; i++)
2317                 fput(fpl->fp[i]);
2318
2319         return 0;
2320 }
2321
2322 /*
2323  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2324  * causes regular reference counting to break down. We rely on the UNIX
2325  * garbage collection to take care of this problem for us.
2326  */
2327 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2328 {
2329         unsigned left, total;
2330         int ret = 0;
2331
2332         total = 0;
2333         left = ctx->nr_user_files;
2334         while (left) {
2335                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2336
2337                 ret = __io_sqe_files_scm(ctx, this_files, total);
2338                 if (ret)
2339                         break;
2340                 left -= this_files;
2341                 total += this_files;
2342         }
2343
2344         if (!ret)
2345                 return 0;
2346
2347         while (total < ctx->nr_user_files) {
2348                 fput(ctx->user_files[total]);
2349                 total++;
2350         }
2351
2352         return ret;
2353 }
2354 #else
2355 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2356 {
2357         return 0;
2358 }
2359 #endif
2360
2361 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2362                                  unsigned nr_args)
2363 {
2364         __s32 __user *fds = (__s32 __user *) arg;
2365         int fd, ret = 0;
2366         unsigned i;
2367
2368         if (ctx->user_files)
2369                 return -EBUSY;
2370         if (!nr_args)
2371                 return -EINVAL;
2372         if (nr_args > IORING_MAX_FIXED_FILES)
2373                 return -EMFILE;
2374
2375         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2376         if (!ctx->user_files)
2377                 return -ENOMEM;
2378
2379         for (i = 0; i < nr_args; i++) {
2380                 ret = -EFAULT;
2381                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2382                         break;
2383
2384                 ctx->user_files[i] = fget(fd);
2385
2386                 ret = -EBADF;
2387                 if (!ctx->user_files[i])
2388                         break;
2389                 /*
2390                  * Don't allow io_uring instances to be registered. If UNIX
2391                  * isn't enabled, then this causes a reference cycle and this
2392                  * instance can never get freed. If UNIX is enabled we'll
2393                  * handle it just fine, but there's still no point in allowing
2394                  * a ring fd as it doesn't support regular read/write anyway.
2395                  */
2396                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2397                         fput(ctx->user_files[i]);
2398                         break;
2399                 }
2400                 ctx->nr_user_files++;
2401                 ret = 0;
2402         }
2403
2404         if (ret) {
2405                 for (i = 0; i < ctx->nr_user_files; i++)
2406                         fput(ctx->user_files[i]);
2407
2408                 kfree(ctx->user_files);
2409                 ctx->user_files = NULL;
2410                 ctx->nr_user_files = 0;
2411                 return ret;
2412         }
2413
2414         ret = io_sqe_files_scm(ctx);
2415         if (ret)
2416                 io_sqe_files_unregister(ctx);
2417
2418         return ret;
2419 }
2420
2421 static int io_sq_offload_start(struct io_ring_ctx *ctx,
2422                                struct io_uring_params *p)
2423 {
2424         int ret;
2425
2426         init_waitqueue_head(&ctx->sqo_wait);
2427         mmgrab(current->mm);
2428         ctx->sqo_mm = current->mm;
2429
2430         if (ctx->flags & IORING_SETUP_SQPOLL) {
2431                 ret = -EPERM;
2432                 if (!capable(CAP_SYS_ADMIN))
2433                         goto err;
2434
2435                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2436                 if (!ctx->sq_thread_idle)
2437                         ctx->sq_thread_idle = HZ;
2438
2439                 if (p->flags & IORING_SETUP_SQ_AFF) {
2440                         int cpu = p->sq_thread_cpu;
2441
2442                         ret = -EINVAL;
2443                         if (cpu >= nr_cpu_ids)
2444                                 goto err;
2445                         if (!cpu_online(cpu))
2446                                 goto err;
2447
2448                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2449                                                         ctx, cpu,
2450                                                         "io_uring-sq");
2451                 } else {
2452                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2453                                                         "io_uring-sq");
2454                 }
2455                 if (IS_ERR(ctx->sqo_thread)) {
2456                         ret = PTR_ERR(ctx->sqo_thread);
2457                         ctx->sqo_thread = NULL;
2458                         goto err;
2459                 }
2460                 wake_up_process(ctx->sqo_thread);
2461         } else if (p->flags & IORING_SETUP_SQ_AFF) {
2462                 /* Can't have SQ_AFF without SQPOLL */
2463                 ret = -EINVAL;
2464                 goto err;
2465         }
2466
2467         /* Do QD, or 2 * CPUS, whatever is smallest */
2468         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2469                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2470         if (!ctx->sqo_wq) {
2471                 ret = -ENOMEM;
2472                 goto err;
2473         }
2474
2475         return 0;
2476 err:
2477         io_sq_thread_stop(ctx);
2478         mmdrop(ctx->sqo_mm);
2479         ctx->sqo_mm = NULL;
2480         return ret;
2481 }
2482
2483 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2484 {
2485         atomic_long_sub(nr_pages, &user->locked_vm);
2486 }
2487
2488 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2489 {
2490         unsigned long page_limit, cur_pages, new_pages;
2491
2492         /* Don't allow more pages than we can safely lock */
2493         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2494
2495         do {
2496                 cur_pages = atomic_long_read(&user->locked_vm);
2497                 new_pages = cur_pages + nr_pages;
2498                 if (new_pages > page_limit)
2499                         return -ENOMEM;
2500         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2501                                         new_pages) != cur_pages);
2502
2503         return 0;
2504 }
2505
2506 static void io_mem_free(void *ptr)
2507 {
2508         struct page *page;
2509
2510         if (!ptr)
2511                 return;
2512
2513         page = virt_to_head_page(ptr);
2514         if (put_page_testzero(page))
2515                 free_compound_page(page);
2516 }
2517
2518 static void *io_mem_alloc(size_t size)
2519 {
2520         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2521                                 __GFP_NORETRY;
2522
2523         return (void *) __get_free_pages(gfp_flags, get_order(size));
2524 }
2525
2526 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2527 {
2528         struct io_sq_ring *sq_ring;
2529         struct io_cq_ring *cq_ring;
2530         size_t bytes;
2531
2532         bytes = struct_size(sq_ring, array, sq_entries);
2533         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2534         bytes += struct_size(cq_ring, cqes, cq_entries);
2535
2536         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2537 }
2538
2539 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2540 {
2541         int i, j;
2542
2543         if (!ctx->user_bufs)
2544                 return -ENXIO;
2545
2546         for (i = 0; i < ctx->nr_user_bufs; i++) {
2547                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2548
2549                 for (j = 0; j < imu->nr_bvecs; j++)
2550                         put_page(imu->bvec[j].bv_page);
2551
2552                 if (ctx->account_mem)
2553                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
2554                 kvfree(imu->bvec);
2555                 imu->nr_bvecs = 0;
2556         }
2557
2558         kfree(ctx->user_bufs);
2559         ctx->user_bufs = NULL;
2560         ctx->nr_user_bufs = 0;
2561         return 0;
2562 }
2563
2564 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2565                        void __user *arg, unsigned index)
2566 {
2567         struct iovec __user *src;
2568
2569 #ifdef CONFIG_COMPAT
2570         if (ctx->compat) {
2571                 struct compat_iovec __user *ciovs;
2572                 struct compat_iovec ciov;
2573
2574                 ciovs = (struct compat_iovec __user *) arg;
2575                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2576                         return -EFAULT;
2577
2578                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2579                 dst->iov_len = ciov.iov_len;
2580                 return 0;
2581         }
2582 #endif
2583         src = (struct iovec __user *) arg;
2584         if (copy_from_user(dst, &src[index], sizeof(*dst)))
2585                 return -EFAULT;
2586         return 0;
2587 }
2588
2589 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2590                                   unsigned nr_args)
2591 {
2592         struct vm_area_struct **vmas = NULL;
2593         struct page **pages = NULL;
2594         int i, j, got_pages = 0;
2595         int ret = -EINVAL;
2596
2597         if (ctx->user_bufs)
2598                 return -EBUSY;
2599         if (!nr_args || nr_args > UIO_MAXIOV)
2600                 return -EINVAL;
2601
2602         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2603                                         GFP_KERNEL);
2604         if (!ctx->user_bufs)
2605                 return -ENOMEM;
2606
2607         for (i = 0; i < nr_args; i++) {
2608                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2609                 unsigned long off, start, end, ubuf;
2610                 int pret, nr_pages;
2611                 struct iovec iov;
2612                 size_t size;
2613
2614                 ret = io_copy_iov(ctx, &iov, arg, i);
2615                 if (ret)
2616                         goto err;
2617
2618                 /*
2619                  * Don't impose further limits on the size and buffer
2620                  * constraints here, we'll -EINVAL later when IO is
2621                  * submitted if they are wrong.
2622                  */
2623                 ret = -EFAULT;
2624                 if (!iov.iov_base || !iov.iov_len)
2625                         goto err;
2626
2627                 /* arbitrary limit, but we need something */
2628                 if (iov.iov_len > SZ_1G)
2629                         goto err;
2630
2631                 ubuf = (unsigned long) iov.iov_base;
2632                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2633                 start = ubuf >> PAGE_SHIFT;
2634                 nr_pages = end - start;
2635
2636                 if (ctx->account_mem) {
2637                         ret = io_account_mem(ctx->user, nr_pages);
2638                         if (ret)
2639                                 goto err;
2640                 }
2641
2642                 ret = 0;
2643                 if (!pages || nr_pages > got_pages) {
2644                         kfree(vmas);
2645                         kfree(pages);
2646                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
2647                                                 GFP_KERNEL);
2648                         vmas = kvmalloc_array(nr_pages,
2649                                         sizeof(struct vm_area_struct *),
2650                                         GFP_KERNEL);
2651                         if (!pages || !vmas) {
2652                                 ret = -ENOMEM;
2653                                 if (ctx->account_mem)
2654                                         io_unaccount_mem(ctx->user, nr_pages);
2655                                 goto err;
2656                         }
2657                         got_pages = nr_pages;
2658                 }
2659
2660                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
2661                                                 GFP_KERNEL);
2662                 ret = -ENOMEM;
2663                 if (!imu->bvec) {
2664                         if (ctx->account_mem)
2665                                 io_unaccount_mem(ctx->user, nr_pages);
2666                         goto err;
2667                 }
2668
2669                 ret = 0;
2670                 down_read(&current->mm->mmap_sem);
2671                 pret = get_user_pages(ubuf, nr_pages,
2672                                       FOLL_WRITE | FOLL_LONGTERM,
2673                                       pages, vmas);
2674                 if (pret == nr_pages) {
2675                         /* don't support file backed memory */
2676                         for (j = 0; j < nr_pages; j++) {
2677                                 struct vm_area_struct *vma = vmas[j];
2678
2679                                 if (vma->vm_file &&
2680                                     !is_file_hugepages(vma->vm_file)) {
2681                                         ret = -EOPNOTSUPP;
2682                                         break;
2683                                 }
2684                         }
2685                 } else {
2686                         ret = pret < 0 ? pret : -EFAULT;
2687                 }
2688                 up_read(&current->mm->mmap_sem);
2689                 if (ret) {
2690                         /*
2691                          * if we did partial map, or found file backed vmas,
2692                          * release any pages we did get
2693                          */
2694                         if (pret > 0) {
2695                                 for (j = 0; j < pret; j++)
2696                                         put_page(pages[j]);
2697                         }
2698                         if (ctx->account_mem)
2699                                 io_unaccount_mem(ctx->user, nr_pages);
2700                         kvfree(imu->bvec);
2701                         goto err;
2702                 }
2703
2704                 off = ubuf & ~PAGE_MASK;
2705                 size = iov.iov_len;
2706                 for (j = 0; j < nr_pages; j++) {
2707                         size_t vec_len;
2708
2709                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
2710                         imu->bvec[j].bv_page = pages[j];
2711                         imu->bvec[j].bv_len = vec_len;
2712                         imu->bvec[j].bv_offset = off;
2713                         off = 0;
2714                         size -= vec_len;
2715                 }
2716                 /* store original address for later verification */
2717                 imu->ubuf = ubuf;
2718                 imu->len = iov.iov_len;
2719                 imu->nr_bvecs = nr_pages;
2720
2721                 ctx->nr_user_bufs++;
2722         }
2723         kvfree(pages);
2724         kvfree(vmas);
2725         return 0;
2726 err:
2727         kvfree(pages);
2728         kvfree(vmas);
2729         io_sqe_buffer_unregister(ctx);
2730         return ret;
2731 }
2732
2733 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
2734 {
2735         __s32 __user *fds = arg;
2736         int fd;
2737
2738         if (ctx->cq_ev_fd)
2739                 return -EBUSY;
2740
2741         if (copy_from_user(&fd, fds, sizeof(*fds)))
2742                 return -EFAULT;
2743
2744         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
2745         if (IS_ERR(ctx->cq_ev_fd)) {
2746                 int ret = PTR_ERR(ctx->cq_ev_fd);
2747                 ctx->cq_ev_fd = NULL;
2748                 return ret;
2749         }
2750
2751         return 0;
2752 }
2753
2754 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2755 {
2756         if (ctx->cq_ev_fd) {
2757                 eventfd_ctx_put(ctx->cq_ev_fd);
2758                 ctx->cq_ev_fd = NULL;
2759                 return 0;
2760         }
2761
2762         return -ENXIO;
2763 }
2764
2765 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2766 {
2767         io_finish_async(ctx);
2768         if (ctx->sqo_mm)
2769                 mmdrop(ctx->sqo_mm);
2770
2771         io_iopoll_reap_events(ctx);
2772         io_sqe_buffer_unregister(ctx);
2773         io_sqe_files_unregister(ctx);
2774         io_eventfd_unregister(ctx);
2775
2776 #if defined(CONFIG_UNIX)
2777         if (ctx->ring_sock) {
2778                 ctx->ring_sock->file = NULL; /* so that iput() is called */
2779                 sock_release(ctx->ring_sock);
2780         }
2781 #endif
2782
2783         io_mem_free(ctx->sq_ring);
2784         io_mem_free(ctx->sq_sqes);
2785         io_mem_free(ctx->cq_ring);
2786
2787         percpu_ref_exit(&ctx->refs);
2788         if (ctx->account_mem)
2789                 io_unaccount_mem(ctx->user,
2790                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
2791         free_uid(ctx->user);
2792         kfree(ctx);
2793 }
2794
2795 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2796 {
2797         struct io_ring_ctx *ctx = file->private_data;
2798         __poll_t mask = 0;
2799
2800         poll_wait(file, &ctx->cq_wait, wait);
2801         /*
2802          * synchronizes with barrier from wq_has_sleeper call in
2803          * io_commit_cqring
2804          */
2805         smp_rmb();
2806         if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
2807             ctx->sq_ring->ring_entries)
2808                 mask |= EPOLLOUT | EPOLLWRNORM;
2809         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2810                 mask |= EPOLLIN | EPOLLRDNORM;
2811
2812         return mask;
2813 }
2814
2815 static int io_uring_fasync(int fd, struct file *file, int on)
2816 {
2817         struct io_ring_ctx *ctx = file->private_data;
2818
2819         return fasync_helper(fd, file, on, &ctx->cq_fasync);
2820 }
2821
2822 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2823 {
2824         mutex_lock(&ctx->uring_lock);
2825         percpu_ref_kill(&ctx->refs);
2826         mutex_unlock(&ctx->uring_lock);
2827
2828         io_poll_remove_all(ctx);
2829         io_iopoll_reap_events(ctx);
2830         wait_for_completion(&ctx->ctx_done);
2831         io_ring_ctx_free(ctx);
2832 }
2833
2834 static int io_uring_release(struct inode *inode, struct file *file)
2835 {
2836         struct io_ring_ctx *ctx = file->private_data;
2837
2838         file->private_data = NULL;
2839         io_ring_ctx_wait_and_kill(ctx);
2840         return 0;
2841 }
2842
2843 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2844 {
2845         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2846         unsigned long sz = vma->vm_end - vma->vm_start;
2847         struct io_ring_ctx *ctx = file->private_data;
2848         unsigned long pfn;
2849         struct page *page;
2850         void *ptr;
2851
2852         switch (offset) {
2853         case IORING_OFF_SQ_RING:
2854                 ptr = ctx->sq_ring;
2855                 break;
2856         case IORING_OFF_SQES:
2857                 ptr = ctx->sq_sqes;
2858                 break;
2859         case IORING_OFF_CQ_RING:
2860                 ptr = ctx->cq_ring;
2861                 break;
2862         default:
2863                 return -EINVAL;
2864         }
2865
2866         page = virt_to_head_page(ptr);
2867         if (sz > (PAGE_SIZE << compound_order(page)))
2868                 return -EINVAL;
2869
2870         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2871         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2872 }
2873
2874 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2875                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2876                 size_t, sigsz)
2877 {
2878         struct io_ring_ctx *ctx;
2879         long ret = -EBADF;
2880         int submitted = 0;
2881         struct fd f;
2882
2883         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2884                 return -EINVAL;
2885
2886         f = fdget(fd);
2887         if (!f.file)
2888                 return -EBADF;
2889
2890         ret = -EOPNOTSUPP;
2891         if (f.file->f_op != &io_uring_fops)
2892                 goto out_fput;
2893
2894         ret = -ENXIO;
2895         ctx = f.file->private_data;
2896         if (!percpu_ref_tryget(&ctx->refs))
2897                 goto out_fput;
2898
2899         /*
2900          * For SQ polling, the thread will do all submissions and completions.
2901          * Just return the requested submit count, and wake the thread if
2902          * we were asked to.
2903          */
2904         if (ctx->flags & IORING_SETUP_SQPOLL) {
2905                 if (flags & IORING_ENTER_SQ_WAKEUP)
2906                         wake_up(&ctx->sqo_wait);
2907                 submitted = to_submit;
2908                 goto out_ctx;
2909         }
2910
2911         ret = 0;
2912         if (to_submit) {
2913                 to_submit = min(to_submit, ctx->sq_entries);
2914
2915                 mutex_lock(&ctx->uring_lock);
2916                 submitted = io_ring_submit(ctx, to_submit);
2917                 mutex_unlock(&ctx->uring_lock);
2918         }
2919         if (flags & IORING_ENTER_GETEVENTS) {
2920                 unsigned nr_events = 0;
2921
2922                 min_complete = min(min_complete, ctx->cq_entries);
2923
2924                 if (ctx->flags & IORING_SETUP_IOPOLL) {
2925                         mutex_lock(&ctx->uring_lock);
2926                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
2927                         mutex_unlock(&ctx->uring_lock);
2928                 } else {
2929                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2930                 }
2931         }
2932
2933 out_ctx:
2934         io_ring_drop_ctx_refs(ctx, 1);
2935 out_fput:
2936         fdput(f);
2937         return submitted ? submitted : ret;
2938 }
2939
2940 static const struct file_operations io_uring_fops = {
2941         .release        = io_uring_release,
2942         .mmap           = io_uring_mmap,
2943         .poll           = io_uring_poll,
2944         .fasync         = io_uring_fasync,
2945 };
2946
2947 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2948                                   struct io_uring_params *p)
2949 {
2950         struct io_sq_ring *sq_ring;
2951         struct io_cq_ring *cq_ring;
2952         size_t size;
2953
2954         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2955         if (!sq_ring)
2956                 return -ENOMEM;
2957
2958         ctx->sq_ring = sq_ring;
2959         sq_ring->ring_mask = p->sq_entries - 1;
2960         sq_ring->ring_entries = p->sq_entries;
2961         ctx->sq_mask = sq_ring->ring_mask;
2962         ctx->sq_entries = sq_ring->ring_entries;
2963
2964         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2965         if (size == SIZE_MAX)
2966                 return -EOVERFLOW;
2967
2968         ctx->sq_sqes = io_mem_alloc(size);
2969         if (!ctx->sq_sqes)
2970                 return -ENOMEM;
2971
2972         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2973         if (!cq_ring)
2974                 return -ENOMEM;
2975
2976         ctx->cq_ring = cq_ring;
2977         cq_ring->ring_mask = p->cq_entries - 1;
2978         cq_ring->ring_entries = p->cq_entries;
2979         ctx->cq_mask = cq_ring->ring_mask;
2980         ctx->cq_entries = cq_ring->ring_entries;
2981         return 0;
2982 }
2983
2984 /*
2985  * Allocate an anonymous fd, this is what constitutes the application
2986  * visible backing of an io_uring instance. The application mmaps this
2987  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
2988  * we have to tie this fd to a socket for file garbage collection purposes.
2989  */
2990 static int io_uring_get_fd(struct io_ring_ctx *ctx)
2991 {
2992         struct file *file;
2993         int ret;
2994
2995 #if defined(CONFIG_UNIX)
2996         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
2997                                 &ctx->ring_sock);
2998         if (ret)
2999                 return ret;
3000 #endif
3001
3002         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3003         if (ret < 0)
3004                 goto err;
3005
3006         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3007                                         O_RDWR | O_CLOEXEC);
3008         if (IS_ERR(file)) {
3009                 put_unused_fd(ret);
3010                 ret = PTR_ERR(file);
3011                 goto err;
3012         }
3013
3014 #if defined(CONFIG_UNIX)
3015         ctx->ring_sock->file = file;
3016         ctx->ring_sock->sk->sk_user_data = ctx;
3017 #endif
3018         fd_install(ret, file);
3019         return ret;
3020 err:
3021 #if defined(CONFIG_UNIX)
3022         sock_release(ctx->ring_sock);
3023         ctx->ring_sock = NULL;
3024 #endif
3025         return ret;
3026 }
3027
3028 static int io_uring_create(unsigned entries, struct io_uring_params *p)
3029 {
3030         struct user_struct *user = NULL;
3031         struct io_ring_ctx *ctx;
3032         bool account_mem;
3033         int ret;
3034
3035         if (!entries || entries > IORING_MAX_ENTRIES)
3036                 return -EINVAL;
3037
3038         /*
3039          * Use twice as many entries for the CQ ring. It's possible for the
3040          * application to drive a higher depth than the size of the SQ ring,
3041          * since the sqes are only used at submission time. This allows for
3042          * some flexibility in overcommitting a bit.
3043          */
3044         p->sq_entries = roundup_pow_of_two(entries);
3045         p->cq_entries = 2 * p->sq_entries;
3046
3047         user = get_uid(current_user());
3048         account_mem = !capable(CAP_IPC_LOCK);
3049
3050         if (account_mem) {
3051                 ret = io_account_mem(user,
3052                                 ring_pages(p->sq_entries, p->cq_entries));
3053                 if (ret) {
3054                         free_uid(user);
3055                         return ret;
3056                 }
3057         }
3058
3059         ctx = io_ring_ctx_alloc(p);
3060         if (!ctx) {
3061                 if (account_mem)
3062                         io_unaccount_mem(user, ring_pages(p->sq_entries,
3063                                                                 p->cq_entries));
3064                 free_uid(user);
3065                 return -ENOMEM;
3066         }
3067         ctx->compat = in_compat_syscall();
3068         ctx->account_mem = account_mem;
3069         ctx->user = user;
3070
3071         ret = io_allocate_scq_urings(ctx, p);
3072         if (ret)
3073                 goto err;
3074
3075         ret = io_sq_offload_start(ctx, p);
3076         if (ret)
3077                 goto err;
3078
3079         ret = io_uring_get_fd(ctx);
3080         if (ret < 0)
3081                 goto err;
3082
3083         memset(&p->sq_off, 0, sizeof(p->sq_off));
3084         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
3085         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
3086         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
3087         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
3088         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
3089         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
3090         p->sq_off.array = offsetof(struct io_sq_ring, array);
3091
3092         memset(&p->cq_off, 0, sizeof(p->cq_off));
3093         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
3094         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
3095         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
3096         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
3097         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
3098         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
3099         return ret;
3100 err:
3101         io_ring_ctx_wait_and_kill(ctx);
3102         return ret;
3103 }
3104
3105 /*
3106  * Sets up an aio uring context, and returns the fd. Applications asks for a
3107  * ring size, we return the actual sq/cq ring sizes (among other things) in the
3108  * params structure passed in.
3109  */
3110 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3111 {
3112         struct io_uring_params p;
3113         long ret;
3114         int i;
3115
3116         if (copy_from_user(&p, params, sizeof(p)))
3117                 return -EFAULT;
3118         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3119                 if (p.resv[i])
3120                         return -EINVAL;
3121         }
3122
3123         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3124                         IORING_SETUP_SQ_AFF))
3125                 return -EINVAL;
3126
3127         ret = io_uring_create(entries, &p);
3128         if (ret < 0)
3129                 return ret;
3130
3131         if (copy_to_user(params, &p, sizeof(p)))
3132                 return -EFAULT;
3133
3134         return ret;
3135 }
3136
3137 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3138                 struct io_uring_params __user *, params)
3139 {
3140         return io_uring_setup(entries, params);
3141 }
3142
3143 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3144                                void __user *arg, unsigned nr_args)
3145         __releases(ctx->uring_lock)
3146         __acquires(ctx->uring_lock)
3147 {
3148         int ret;
3149
3150         /*
3151          * We're inside the ring mutex, if the ref is already dying, then
3152          * someone else killed the ctx or is already going through
3153          * io_uring_register().
3154          */
3155         if (percpu_ref_is_dying(&ctx->refs))
3156                 return -ENXIO;
3157
3158         percpu_ref_kill(&ctx->refs);
3159
3160         /*
3161          * Drop uring mutex before waiting for references to exit. If another
3162          * thread is currently inside io_uring_enter() it might need to grab
3163          * the uring_lock to make progress. If we hold it here across the drain
3164          * wait, then we can deadlock. It's safe to drop the mutex here, since
3165          * no new references will come in after we've killed the percpu ref.
3166          */
3167         mutex_unlock(&ctx->uring_lock);
3168         wait_for_completion(&ctx->ctx_done);
3169         mutex_lock(&ctx->uring_lock);
3170
3171         switch (opcode) {
3172         case IORING_REGISTER_BUFFERS:
3173                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
3174                 break;
3175         case IORING_UNREGISTER_BUFFERS:
3176                 ret = -EINVAL;
3177                 if (arg || nr_args)
3178                         break;
3179                 ret = io_sqe_buffer_unregister(ctx);
3180                 break;
3181         case IORING_REGISTER_FILES:
3182                 ret = io_sqe_files_register(ctx, arg, nr_args);
3183                 break;
3184         case IORING_UNREGISTER_FILES:
3185                 ret = -EINVAL;
3186                 if (arg || nr_args)
3187                         break;
3188                 ret = io_sqe_files_unregister(ctx);
3189                 break;
3190         case IORING_REGISTER_EVENTFD:
3191                 ret = -EINVAL;
3192                 if (nr_args != 1)
3193                         break;
3194                 ret = io_eventfd_register(ctx, arg);
3195                 break;
3196         case IORING_UNREGISTER_EVENTFD:
3197                 ret = -EINVAL;
3198                 if (arg || nr_args)
3199                         break;
3200                 ret = io_eventfd_unregister(ctx);
3201                 break;
3202         default:
3203                 ret = -EINVAL;
3204                 break;
3205         }
3206
3207         /* bring the ctx back to life */
3208         reinit_completion(&ctx->ctx_done);
3209         percpu_ref_reinit(&ctx->refs);
3210         return ret;
3211 }
3212
3213 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3214                 void __user *, arg, unsigned int, nr_args)
3215 {
3216         struct io_ring_ctx *ctx;
3217         long ret = -EBADF;
3218         struct fd f;
3219
3220         f = fdget(fd);
3221         if (!f.file)
3222                 return -EBADF;
3223
3224         ret = -EOPNOTSUPP;
3225         if (f.file->f_op != &io_uring_fops)
3226                 goto out_fput;
3227
3228         ctx = f.file->private_data;
3229
3230         mutex_lock(&ctx->uring_lock);
3231         ret = __io_uring_register(ctx, opcode, arg, nr_args);
3232         mutex_unlock(&ctx->uring_lock);
3233 out_fput:
3234         fdput(f);
3235         return ret;
3236 }
3237
3238 static int __init io_uring_init(void)
3239 {
3240         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3241         return 0;
3242 };
3243 __initcall(io_uring_init);