s390/vdso: drop unnecessary cc-ldoption
[linux-2.6-microblaze.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side. When the application reads the CQ ring
8  * tail, it must use an appropriate smp_rmb() to order with the smp_wmb()
9  * the kernel uses after writing the tail. Failure to do so could cause a
10  * delay in when the application notices that completion events available.
11  * This isn't a fatal condition. Likewise, the application must use an
12  * appropriate smp_wmb() both before writing the SQ tail, and after writing
13  * the SQ tail. The first one orders the sqe writes with the tail write, and
14  * the latter is paired with the smp_rmb() the kernel will issue before
15  * reading the SQ tail on submission.
16  *
17  * Also see the examples in the liburing library:
18  *
19  *      git://git.kernel.dk/liburing
20  *
21  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
22  * from data shared between the kernel and application. This is done both
23  * for ordering purposes, but also to ensure that once a value is loaded from
24  * data that the application could potentially modify, it remains stable.
25  *
26  * Copyright (C) 2018-2019 Jens Axboe
27  * Copyright (c) 2018-2019 Christoph Hellwig
28  */
29 #include <linux/kernel.h>
30 #include <linux/init.h>
31 #include <linux/errno.h>
32 #include <linux/syscalls.h>
33 #include <linux/compat.h>
34 #include <linux/refcount.h>
35 #include <linux/uio.h>
36
37 #include <linux/sched/signal.h>
38 #include <linux/fs.h>
39 #include <linux/file.h>
40 #include <linux/fdtable.h>
41 #include <linux/mm.h>
42 #include <linux/mman.h>
43 #include <linux/mmu_context.h>
44 #include <linux/percpu.h>
45 #include <linux/slab.h>
46 #include <linux/workqueue.h>
47 #include <linux/kthread.h>
48 #include <linux/blkdev.h>
49 #include <linux/bvec.h>
50 #include <linux/net.h>
51 #include <net/sock.h>
52 #include <net/af_unix.h>
53 #include <net/scm.h>
54 #include <linux/anon_inodes.h>
55 #include <linux/sched/mm.h>
56 #include <linux/uaccess.h>
57 #include <linux/nospec.h>
58 #include <linux/sizes.h>
59 #include <linux/hugetlb.h>
60
61 #include <uapi/linux/io_uring.h>
62
63 #include "internal.h"
64
65 #define IORING_MAX_ENTRIES      4096
66 #define IORING_MAX_FIXED_FILES  1024
67
68 struct io_uring {
69         u32 head ____cacheline_aligned_in_smp;
70         u32 tail ____cacheline_aligned_in_smp;
71 };
72
73 struct io_sq_ring {
74         struct io_uring         r;
75         u32                     ring_mask;
76         u32                     ring_entries;
77         u32                     dropped;
78         u32                     flags;
79         u32                     array[];
80 };
81
82 struct io_cq_ring {
83         struct io_uring         r;
84         u32                     ring_mask;
85         u32                     ring_entries;
86         u32                     overflow;
87         struct io_uring_cqe     cqes[];
88 };
89
90 struct io_mapped_ubuf {
91         u64             ubuf;
92         size_t          len;
93         struct          bio_vec *bvec;
94         unsigned int    nr_bvecs;
95 };
96
97 struct async_list {
98         spinlock_t              lock;
99         atomic_t                cnt;
100         struct list_head        list;
101
102         struct file             *file;
103         off_t                   io_end;
104         size_t                  io_pages;
105 };
106
107 struct io_ring_ctx {
108         struct {
109                 struct percpu_ref       refs;
110         } ____cacheline_aligned_in_smp;
111
112         struct {
113                 unsigned int            flags;
114                 bool                    compat;
115                 bool                    account_mem;
116
117                 /* SQ ring */
118                 struct io_sq_ring       *sq_ring;
119                 unsigned                cached_sq_head;
120                 unsigned                sq_entries;
121                 unsigned                sq_mask;
122                 unsigned                sq_thread_idle;
123                 struct io_uring_sqe     *sq_sqes;
124         } ____cacheline_aligned_in_smp;
125
126         /* IO offload */
127         struct workqueue_struct *sqo_wq;
128         struct task_struct      *sqo_thread;    /* if using sq thread polling */
129         struct mm_struct        *sqo_mm;
130         wait_queue_head_t       sqo_wait;
131         unsigned                sqo_stop;
132
133         struct {
134                 /* CQ ring */
135                 struct io_cq_ring       *cq_ring;
136                 unsigned                cached_cq_tail;
137                 unsigned                cq_entries;
138                 unsigned                cq_mask;
139                 struct wait_queue_head  cq_wait;
140                 struct fasync_struct    *cq_fasync;
141         } ____cacheline_aligned_in_smp;
142
143         /*
144          * If used, fixed file set. Writers must ensure that ->refs is dead,
145          * readers must ensure that ->refs is alive as long as the file* is
146          * used. Only updated through io_uring_register(2).
147          */
148         struct file             **user_files;
149         unsigned                nr_user_files;
150
151         /* if used, fixed mapped user buffers */
152         unsigned                nr_user_bufs;
153         struct io_mapped_ubuf   *user_bufs;
154
155         struct user_struct      *user;
156
157         struct completion       ctx_done;
158
159         struct {
160                 struct mutex            uring_lock;
161                 wait_queue_head_t       wait;
162         } ____cacheline_aligned_in_smp;
163
164         struct {
165                 spinlock_t              completion_lock;
166                 bool                    poll_multi_file;
167                 /*
168                  * ->poll_list is protected by the ctx->uring_lock for
169                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
170                  * For SQPOLL, only the single threaded io_sq_thread() will
171                  * manipulate the list, hence no extra locking is needed there.
172                  */
173                 struct list_head        poll_list;
174                 struct list_head        cancel_list;
175         } ____cacheline_aligned_in_smp;
176
177         struct async_list       pending_async[2];
178
179 #if defined(CONFIG_UNIX)
180         struct socket           *ring_sock;
181 #endif
182 };
183
184 struct sqe_submit {
185         const struct io_uring_sqe       *sqe;
186         unsigned short                  index;
187         bool                            has_user;
188         bool                            needs_lock;
189         bool                            needs_fixed_file;
190 };
191
192 /*
193  * First field must be the file pointer in all the
194  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
195  */
196 struct io_poll_iocb {
197         struct file                     *file;
198         struct wait_queue_head          *head;
199         __poll_t                        events;
200         bool                            done;
201         bool                            canceled;
202         struct wait_queue_entry         wait;
203 };
204
205 /*
206  * NOTE! Each of the iocb union members has the file pointer
207  * as the first entry in their struct definition. So you can
208  * access the file pointer through any of the sub-structs,
209  * or directly as just 'ki_filp' in this struct.
210  */
211 struct io_kiocb {
212         union {
213                 struct file             *file;
214                 struct kiocb            rw;
215                 struct io_poll_iocb     poll;
216         };
217
218         struct sqe_submit       submit;
219
220         struct io_ring_ctx      *ctx;
221         struct list_head        list;
222         unsigned int            flags;
223         refcount_t              refs;
224 #define REQ_F_FORCE_NONBLOCK    1       /* inline submission attempt */
225 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
226 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
227 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
228 #define REQ_F_PREPPED           16      /* prep already done */
229         u64                     user_data;
230         u64                     error;
231
232         struct work_struct      work;
233 };
234
235 #define IO_PLUG_THRESHOLD               2
236 #define IO_IOPOLL_BATCH                 8
237
238 struct io_submit_state {
239         struct blk_plug         plug;
240
241         /*
242          * io_kiocb alloc cache
243          */
244         void                    *reqs[IO_IOPOLL_BATCH];
245         unsigned                int free_reqs;
246         unsigned                int cur_req;
247
248         /*
249          * File reference cache
250          */
251         struct file             *file;
252         unsigned int            fd;
253         unsigned int            has_refs;
254         unsigned int            used_refs;
255         unsigned int            ios_left;
256 };
257
258 static struct kmem_cache *req_cachep;
259
260 static const struct file_operations io_uring_fops;
261
262 struct sock *io_uring_get_socket(struct file *file)
263 {
264 #if defined(CONFIG_UNIX)
265         if (file->f_op == &io_uring_fops) {
266                 struct io_ring_ctx *ctx = file->private_data;
267
268                 return ctx->ring_sock->sk;
269         }
270 #endif
271         return NULL;
272 }
273 EXPORT_SYMBOL(io_uring_get_socket);
274
275 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
276 {
277         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
278
279         complete(&ctx->ctx_done);
280 }
281
282 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
283 {
284         struct io_ring_ctx *ctx;
285         int i;
286
287         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
288         if (!ctx)
289                 return NULL;
290
291         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
292                 kfree(ctx);
293                 return NULL;
294         }
295
296         ctx->flags = p->flags;
297         init_waitqueue_head(&ctx->cq_wait);
298         init_completion(&ctx->ctx_done);
299         mutex_init(&ctx->uring_lock);
300         init_waitqueue_head(&ctx->wait);
301         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
302                 spin_lock_init(&ctx->pending_async[i].lock);
303                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
304                 atomic_set(&ctx->pending_async[i].cnt, 0);
305         }
306         spin_lock_init(&ctx->completion_lock);
307         INIT_LIST_HEAD(&ctx->poll_list);
308         INIT_LIST_HEAD(&ctx->cancel_list);
309         return ctx;
310 }
311
312 static void io_commit_cqring(struct io_ring_ctx *ctx)
313 {
314         struct io_cq_ring *ring = ctx->cq_ring;
315
316         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
317                 /* order cqe stores with ring update */
318                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
319
320                 /*
321                  * Write sider barrier of tail update, app has read side. See
322                  * comment at the top of this file.
323                  */
324                 smp_wmb();
325
326                 if (wq_has_sleeper(&ctx->cq_wait)) {
327                         wake_up_interruptible(&ctx->cq_wait);
328                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
329                 }
330         }
331 }
332
333 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
334 {
335         struct io_cq_ring *ring = ctx->cq_ring;
336         unsigned tail;
337
338         tail = ctx->cached_cq_tail;
339         /* See comment at the top of the file */
340         smp_rmb();
341         if (tail + 1 == READ_ONCE(ring->r.head))
342                 return NULL;
343
344         ctx->cached_cq_tail++;
345         return &ring->cqes[tail & ctx->cq_mask];
346 }
347
348 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
349                                  long res, unsigned ev_flags)
350 {
351         struct io_uring_cqe *cqe;
352
353         /*
354          * If we can't get a cq entry, userspace overflowed the
355          * submission (by quite a lot). Increment the overflow count in
356          * the ring.
357          */
358         cqe = io_get_cqring(ctx);
359         if (cqe) {
360                 WRITE_ONCE(cqe->user_data, ki_user_data);
361                 WRITE_ONCE(cqe->res, res);
362                 WRITE_ONCE(cqe->flags, ev_flags);
363         } else {
364                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
365
366                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
367         }
368 }
369
370 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
371 {
372         if (waitqueue_active(&ctx->wait))
373                 wake_up(&ctx->wait);
374         if (waitqueue_active(&ctx->sqo_wait))
375                 wake_up(&ctx->sqo_wait);
376 }
377
378 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
379                                 long res, unsigned ev_flags)
380 {
381         unsigned long flags;
382
383         spin_lock_irqsave(&ctx->completion_lock, flags);
384         io_cqring_fill_event(ctx, user_data, res, ev_flags);
385         io_commit_cqring(ctx);
386         spin_unlock_irqrestore(&ctx->completion_lock, flags);
387
388         io_cqring_ev_posted(ctx);
389 }
390
391 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
392 {
393         percpu_ref_put_many(&ctx->refs, refs);
394
395         if (waitqueue_active(&ctx->wait))
396                 wake_up(&ctx->wait);
397 }
398
399 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
400                                    struct io_submit_state *state)
401 {
402         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
403         struct io_kiocb *req;
404
405         if (!percpu_ref_tryget(&ctx->refs))
406                 return NULL;
407
408         if (!state) {
409                 req = kmem_cache_alloc(req_cachep, gfp);
410                 if (unlikely(!req))
411                         goto out;
412         } else if (!state->free_reqs) {
413                 size_t sz;
414                 int ret;
415
416                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
417                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
418
419                 /*
420                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
421                  * retry single alloc to be on the safe side.
422                  */
423                 if (unlikely(ret <= 0)) {
424                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
425                         if (!state->reqs[0])
426                                 goto out;
427                         ret = 1;
428                 }
429                 state->free_reqs = ret - 1;
430                 state->cur_req = 1;
431                 req = state->reqs[0];
432         } else {
433                 req = state->reqs[state->cur_req];
434                 state->free_reqs--;
435                 state->cur_req++;
436         }
437
438         req->ctx = ctx;
439         req->flags = 0;
440         /* one is dropped after submission, the other at completion */
441         refcount_set(&req->refs, 2);
442         return req;
443 out:
444         io_ring_drop_ctx_refs(ctx, 1);
445         return NULL;
446 }
447
448 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
449 {
450         if (*nr) {
451                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
452                 io_ring_drop_ctx_refs(ctx, *nr);
453                 *nr = 0;
454         }
455 }
456
457 static void io_free_req(struct io_kiocb *req)
458 {
459         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
460                 fput(req->file);
461         io_ring_drop_ctx_refs(req->ctx, 1);
462         kmem_cache_free(req_cachep, req);
463 }
464
465 static void io_put_req(struct io_kiocb *req)
466 {
467         if (refcount_dec_and_test(&req->refs))
468                 io_free_req(req);
469 }
470
471 /*
472  * Find and free completed poll iocbs
473  */
474 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
475                                struct list_head *done)
476 {
477         void *reqs[IO_IOPOLL_BATCH];
478         struct io_kiocb *req;
479         int to_free;
480
481         to_free = 0;
482         while (!list_empty(done)) {
483                 req = list_first_entry(done, struct io_kiocb, list);
484                 list_del(&req->list);
485
486                 io_cqring_fill_event(ctx, req->user_data, req->error, 0);
487                 (*nr_events)++;
488
489                 if (refcount_dec_and_test(&req->refs)) {
490                         /* If we're not using fixed files, we have to pair the
491                          * completion part with the file put. Use regular
492                          * completions for those, only batch free for fixed
493                          * file.
494                          */
495                         if (req->flags & REQ_F_FIXED_FILE) {
496                                 reqs[to_free++] = req;
497                                 if (to_free == ARRAY_SIZE(reqs))
498                                         io_free_req_many(ctx, reqs, &to_free);
499                         } else {
500                                 io_free_req(req);
501                         }
502                 }
503         }
504
505         io_commit_cqring(ctx);
506         io_free_req_many(ctx, reqs, &to_free);
507 }
508
509 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
510                         long min)
511 {
512         struct io_kiocb *req, *tmp;
513         LIST_HEAD(done);
514         bool spin;
515         int ret;
516
517         /*
518          * Only spin for completions if we don't have multiple devices hanging
519          * off our complete list, and we're under the requested amount.
520          */
521         spin = !ctx->poll_multi_file && *nr_events < min;
522
523         ret = 0;
524         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
525                 struct kiocb *kiocb = &req->rw;
526
527                 /*
528                  * Move completed entries to our local list. If we find a
529                  * request that requires polling, break out and complete
530                  * the done list first, if we have entries there.
531                  */
532                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
533                         list_move_tail(&req->list, &done);
534                         continue;
535                 }
536                 if (!list_empty(&done))
537                         break;
538
539                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
540                 if (ret < 0)
541                         break;
542
543                 if (ret && spin)
544                         spin = false;
545                 ret = 0;
546         }
547
548         if (!list_empty(&done))
549                 io_iopoll_complete(ctx, nr_events, &done);
550
551         return ret;
552 }
553
554 /*
555  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
556  * non-spinning poll check - we'll still enter the driver poll loop, but only
557  * as a non-spinning completion check.
558  */
559 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
560                                 long min)
561 {
562         while (!list_empty(&ctx->poll_list)) {
563                 int ret;
564
565                 ret = io_do_iopoll(ctx, nr_events, min);
566                 if (ret < 0)
567                         return ret;
568                 if (!min || *nr_events >= min)
569                         return 0;
570         }
571
572         return 1;
573 }
574
575 /*
576  * We can't just wait for polled events to come to us, we have to actively
577  * find and complete them.
578  */
579 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
580 {
581         if (!(ctx->flags & IORING_SETUP_IOPOLL))
582                 return;
583
584         mutex_lock(&ctx->uring_lock);
585         while (!list_empty(&ctx->poll_list)) {
586                 unsigned int nr_events = 0;
587
588                 io_iopoll_getevents(ctx, &nr_events, 1);
589         }
590         mutex_unlock(&ctx->uring_lock);
591 }
592
593 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
594                            long min)
595 {
596         int ret = 0;
597
598         do {
599                 int tmin = 0;
600
601                 if (*nr_events < min)
602                         tmin = min - *nr_events;
603
604                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
605                 if (ret <= 0)
606                         break;
607                 ret = 0;
608         } while (min && !*nr_events && !need_resched());
609
610         return ret;
611 }
612
613 static void kiocb_end_write(struct kiocb *kiocb)
614 {
615         if (kiocb->ki_flags & IOCB_WRITE) {
616                 struct inode *inode = file_inode(kiocb->ki_filp);
617
618                 /*
619                  * Tell lockdep we inherited freeze protection from submission
620                  * thread.
621                  */
622                 if (S_ISREG(inode->i_mode))
623                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
624                 file_end_write(kiocb->ki_filp);
625         }
626 }
627
628 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
629 {
630         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
631
632         kiocb_end_write(kiocb);
633
634         io_cqring_add_event(req->ctx, req->user_data, res, 0);
635         io_put_req(req);
636 }
637
638 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
639 {
640         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
641
642         kiocb_end_write(kiocb);
643
644         req->error = res;
645         if (res != -EAGAIN)
646                 req->flags |= REQ_F_IOPOLL_COMPLETED;
647 }
648
649 /*
650  * After the iocb has been issued, it's safe to be found on the poll list.
651  * Adding the kiocb to the list AFTER submission ensures that we don't
652  * find it from a io_iopoll_getevents() thread before the issuer is done
653  * accessing the kiocb cookie.
654  */
655 static void io_iopoll_req_issued(struct io_kiocb *req)
656 {
657         struct io_ring_ctx *ctx = req->ctx;
658
659         /*
660          * Track whether we have multiple files in our lists. This will impact
661          * how we do polling eventually, not spinning if we're on potentially
662          * different devices.
663          */
664         if (list_empty(&ctx->poll_list)) {
665                 ctx->poll_multi_file = false;
666         } else if (!ctx->poll_multi_file) {
667                 struct io_kiocb *list_req;
668
669                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
670                                                 list);
671                 if (list_req->rw.ki_filp != req->rw.ki_filp)
672                         ctx->poll_multi_file = true;
673         }
674
675         /*
676          * For fast devices, IO may have already completed. If it has, add
677          * it to the front so we find it first.
678          */
679         if (req->flags & REQ_F_IOPOLL_COMPLETED)
680                 list_add(&req->list, &ctx->poll_list);
681         else
682                 list_add_tail(&req->list, &ctx->poll_list);
683 }
684
685 static void io_file_put(struct io_submit_state *state, struct file *file)
686 {
687         if (!state) {
688                 fput(file);
689         } else if (state->file) {
690                 int diff = state->has_refs - state->used_refs;
691
692                 if (diff)
693                         fput_many(state->file, diff);
694                 state->file = NULL;
695         }
696 }
697
698 /*
699  * Get as many references to a file as we have IOs left in this submission,
700  * assuming most submissions are for one file, or at least that each file
701  * has more than one submission.
702  */
703 static struct file *io_file_get(struct io_submit_state *state, int fd)
704 {
705         if (!state)
706                 return fget(fd);
707
708         if (state->file) {
709                 if (state->fd == fd) {
710                         state->used_refs++;
711                         state->ios_left--;
712                         return state->file;
713                 }
714                 io_file_put(state, NULL);
715         }
716         state->file = fget_many(fd, state->ios_left);
717         if (!state->file)
718                 return NULL;
719
720         state->fd = fd;
721         state->has_refs = state->ios_left;
722         state->used_refs = 1;
723         state->ios_left--;
724         return state->file;
725 }
726
727 /*
728  * If we tracked the file through the SCM inflight mechanism, we could support
729  * any file. For now, just ensure that anything potentially problematic is done
730  * inline.
731  */
732 static bool io_file_supports_async(struct file *file)
733 {
734         umode_t mode = file_inode(file)->i_mode;
735
736         if (S_ISBLK(mode) || S_ISCHR(mode))
737                 return true;
738         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
739                 return true;
740
741         return false;
742 }
743
744 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
745                       bool force_nonblock, struct io_submit_state *state)
746 {
747         const struct io_uring_sqe *sqe = s->sqe;
748         struct io_ring_ctx *ctx = req->ctx;
749         struct kiocb *kiocb = &req->rw;
750         unsigned ioprio;
751         int ret;
752
753         if (!req->file)
754                 return -EBADF;
755         /* For -EAGAIN retry, everything is already prepped */
756         if (req->flags & REQ_F_PREPPED)
757                 return 0;
758
759         if (force_nonblock && !io_file_supports_async(req->file))
760                 force_nonblock = false;
761
762         kiocb->ki_pos = READ_ONCE(sqe->off);
763         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
764         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
765
766         ioprio = READ_ONCE(sqe->ioprio);
767         if (ioprio) {
768                 ret = ioprio_check_cap(ioprio);
769                 if (ret)
770                         return ret;
771
772                 kiocb->ki_ioprio = ioprio;
773         } else
774                 kiocb->ki_ioprio = get_current_ioprio();
775
776         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
777         if (unlikely(ret))
778                 return ret;
779         if (force_nonblock) {
780                 kiocb->ki_flags |= IOCB_NOWAIT;
781                 req->flags |= REQ_F_FORCE_NONBLOCK;
782         }
783         if (ctx->flags & IORING_SETUP_IOPOLL) {
784                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
785                     !kiocb->ki_filp->f_op->iopoll)
786                         return -EOPNOTSUPP;
787
788                 req->error = 0;
789                 kiocb->ki_flags |= IOCB_HIPRI;
790                 kiocb->ki_complete = io_complete_rw_iopoll;
791         } else {
792                 if (kiocb->ki_flags & IOCB_HIPRI)
793                         return -EINVAL;
794                 kiocb->ki_complete = io_complete_rw;
795         }
796         req->flags |= REQ_F_PREPPED;
797         return 0;
798 }
799
800 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
801 {
802         switch (ret) {
803         case -EIOCBQUEUED:
804                 break;
805         case -ERESTARTSYS:
806         case -ERESTARTNOINTR:
807         case -ERESTARTNOHAND:
808         case -ERESTART_RESTARTBLOCK:
809                 /*
810                  * We can't just restart the syscall, since previously
811                  * submitted sqes may already be in progress. Just fail this
812                  * IO with EINTR.
813                  */
814                 ret = -EINTR;
815                 /* fall through */
816         default:
817                 kiocb->ki_complete(kiocb, ret, 0);
818         }
819 }
820
821 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
822                            const struct io_uring_sqe *sqe,
823                            struct iov_iter *iter)
824 {
825         size_t len = READ_ONCE(sqe->len);
826         struct io_mapped_ubuf *imu;
827         unsigned index, buf_index;
828         size_t offset;
829         u64 buf_addr;
830
831         /* attempt to use fixed buffers without having provided iovecs */
832         if (unlikely(!ctx->user_bufs))
833                 return -EFAULT;
834
835         buf_index = READ_ONCE(sqe->buf_index);
836         if (unlikely(buf_index >= ctx->nr_user_bufs))
837                 return -EFAULT;
838
839         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
840         imu = &ctx->user_bufs[index];
841         buf_addr = READ_ONCE(sqe->addr);
842
843         /* overflow */
844         if (buf_addr + len < buf_addr)
845                 return -EFAULT;
846         /* not inside the mapped region */
847         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
848                 return -EFAULT;
849
850         /*
851          * May not be a start of buffer, set size appropriately
852          * and advance us to the beginning.
853          */
854         offset = buf_addr - imu->ubuf;
855         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
856         if (offset)
857                 iov_iter_advance(iter, offset);
858
859         /* don't drop a reference to these pages */
860         iter->type |= ITER_BVEC_FLAG_NO_REF;
861         return 0;
862 }
863
864 static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
865                            const struct sqe_submit *s, struct iovec **iovec,
866                            struct iov_iter *iter)
867 {
868         const struct io_uring_sqe *sqe = s->sqe;
869         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
870         size_t sqe_len = READ_ONCE(sqe->len);
871         u8 opcode;
872
873         /*
874          * We're reading ->opcode for the second time, but the first read
875          * doesn't care whether it's _FIXED or not, so it doesn't matter
876          * whether ->opcode changes concurrently. The first read does care
877          * about whether it is a READ or a WRITE, so we don't trust this read
878          * for that purpose and instead let the caller pass in the read/write
879          * flag.
880          */
881         opcode = READ_ONCE(sqe->opcode);
882         if (opcode == IORING_OP_READ_FIXED ||
883             opcode == IORING_OP_WRITE_FIXED) {
884                 int ret = io_import_fixed(ctx, rw, sqe, iter);
885                 *iovec = NULL;
886                 return ret;
887         }
888
889         if (!s->has_user)
890                 return -EFAULT;
891
892 #ifdef CONFIG_COMPAT
893         if (ctx->compat)
894                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
895                                                 iovec, iter);
896 #endif
897
898         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
899 }
900
901 /*
902  * Make a note of the last file/offset/direction we punted to async
903  * context. We'll use this information to see if we can piggy back a
904  * sequential request onto the previous one, if it's still hasn't been
905  * completed by the async worker.
906  */
907 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
908 {
909         struct async_list *async_list = &req->ctx->pending_async[rw];
910         struct kiocb *kiocb = &req->rw;
911         struct file *filp = kiocb->ki_filp;
912         off_t io_end = kiocb->ki_pos + len;
913
914         if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
915                 unsigned long max_pages;
916
917                 /* Use 8x RA size as a decent limiter for both reads/writes */
918                 max_pages = filp->f_ra.ra_pages;
919                 if (!max_pages)
920                         max_pages = VM_READAHEAD_PAGES;
921                 max_pages *= 8;
922
923                 /* If max pages are exceeded, reset the state */
924                 len >>= PAGE_SHIFT;
925                 if (async_list->io_pages + len <= max_pages) {
926                         req->flags |= REQ_F_SEQ_PREV;
927                         async_list->io_pages += len;
928                 } else {
929                         io_end = 0;
930                         async_list->io_pages = 0;
931                 }
932         }
933
934         /* New file? Reset state. */
935         if (async_list->file != filp) {
936                 async_list->io_pages = 0;
937                 async_list->file = filp;
938         }
939         async_list->io_end = io_end;
940 }
941
942 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
943                    bool force_nonblock, struct io_submit_state *state)
944 {
945         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
946         struct kiocb *kiocb = &req->rw;
947         struct iov_iter iter;
948         struct file *file;
949         size_t iov_count;
950         int ret;
951
952         ret = io_prep_rw(req, s, force_nonblock, state);
953         if (ret)
954                 return ret;
955         file = kiocb->ki_filp;
956
957         if (unlikely(!(file->f_mode & FMODE_READ)))
958                 return -EBADF;
959         if (unlikely(!file->f_op->read_iter))
960                 return -EINVAL;
961
962         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
963         if (ret)
964                 return ret;
965
966         iov_count = iov_iter_count(&iter);
967         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
968         if (!ret) {
969                 ssize_t ret2;
970
971                 /* Catch -EAGAIN return for forced non-blocking submission */
972                 ret2 = call_read_iter(file, kiocb, &iter);
973                 if (!force_nonblock || ret2 != -EAGAIN) {
974                         io_rw_done(kiocb, ret2);
975                 } else {
976                         /*
977                          * If ->needs_lock is true, we're already in async
978                          * context.
979                          */
980                         if (!s->needs_lock)
981                                 io_async_list_note(READ, req, iov_count);
982                         ret = -EAGAIN;
983                 }
984         }
985         kfree(iovec);
986         return ret;
987 }
988
989 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
990                     bool force_nonblock, struct io_submit_state *state)
991 {
992         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
993         struct kiocb *kiocb = &req->rw;
994         struct iov_iter iter;
995         struct file *file;
996         size_t iov_count;
997         int ret;
998
999         ret = io_prep_rw(req, s, force_nonblock, state);
1000         if (ret)
1001                 return ret;
1002
1003         file = kiocb->ki_filp;
1004         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1005                 return -EBADF;
1006         if (unlikely(!file->f_op->write_iter))
1007                 return -EINVAL;
1008
1009         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1010         if (ret)
1011                 return ret;
1012
1013         iov_count = iov_iter_count(&iter);
1014
1015         ret = -EAGAIN;
1016         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1017                 /* If ->needs_lock is true, we're already in async context. */
1018                 if (!s->needs_lock)
1019                         io_async_list_note(WRITE, req, iov_count);
1020                 goto out_free;
1021         }
1022
1023         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1024         if (!ret) {
1025                 /*
1026                  * Open-code file_start_write here to grab freeze protection,
1027                  * which will be released by another thread in
1028                  * io_complete_rw().  Fool lockdep by telling it the lock got
1029                  * released so that it doesn't complain about the held lock when
1030                  * we return to userspace.
1031                  */
1032                 if (S_ISREG(file_inode(file)->i_mode)) {
1033                         __sb_start_write(file_inode(file)->i_sb,
1034                                                 SB_FREEZE_WRITE, true);
1035                         __sb_writers_release(file_inode(file)->i_sb,
1036                                                 SB_FREEZE_WRITE);
1037                 }
1038                 kiocb->ki_flags |= IOCB_WRITE;
1039                 io_rw_done(kiocb, call_write_iter(file, kiocb, &iter));
1040         }
1041 out_free:
1042         kfree(iovec);
1043         return ret;
1044 }
1045
1046 /*
1047  * IORING_OP_NOP just posts a completion event, nothing else.
1048  */
1049 static int io_nop(struct io_kiocb *req, u64 user_data)
1050 {
1051         struct io_ring_ctx *ctx = req->ctx;
1052         long err = 0;
1053
1054         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1055                 return -EINVAL;
1056
1057         io_cqring_add_event(ctx, user_data, err, 0);
1058         io_put_req(req);
1059         return 0;
1060 }
1061
1062 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1063 {
1064         struct io_ring_ctx *ctx = req->ctx;
1065
1066         if (!req->file)
1067                 return -EBADF;
1068         /* Prep already done (EAGAIN retry) */
1069         if (req->flags & REQ_F_PREPPED)
1070                 return 0;
1071
1072         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1073                 return -EINVAL;
1074         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1075                 return -EINVAL;
1076
1077         req->flags |= REQ_F_PREPPED;
1078         return 0;
1079 }
1080
1081 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1082                     bool force_nonblock)
1083 {
1084         loff_t sqe_off = READ_ONCE(sqe->off);
1085         loff_t sqe_len = READ_ONCE(sqe->len);
1086         loff_t end = sqe_off + sqe_len;
1087         unsigned fsync_flags;
1088         int ret;
1089
1090         fsync_flags = READ_ONCE(sqe->fsync_flags);
1091         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1092                 return -EINVAL;
1093
1094         ret = io_prep_fsync(req, sqe);
1095         if (ret)
1096                 return ret;
1097
1098         /* fsync always requires a blocking context */
1099         if (force_nonblock)
1100                 return -EAGAIN;
1101
1102         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1103                                 end > 0 ? end : LLONG_MAX,
1104                                 fsync_flags & IORING_FSYNC_DATASYNC);
1105
1106         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1107         io_put_req(req);
1108         return 0;
1109 }
1110
1111 static void io_poll_remove_one(struct io_kiocb *req)
1112 {
1113         struct io_poll_iocb *poll = &req->poll;
1114
1115         spin_lock(&poll->head->lock);
1116         WRITE_ONCE(poll->canceled, true);
1117         if (!list_empty(&poll->wait.entry)) {
1118                 list_del_init(&poll->wait.entry);
1119                 queue_work(req->ctx->sqo_wq, &req->work);
1120         }
1121         spin_unlock(&poll->head->lock);
1122
1123         list_del_init(&req->list);
1124 }
1125
1126 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1127 {
1128         struct io_kiocb *req;
1129
1130         spin_lock_irq(&ctx->completion_lock);
1131         while (!list_empty(&ctx->cancel_list)) {
1132                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1133                 io_poll_remove_one(req);
1134         }
1135         spin_unlock_irq(&ctx->completion_lock);
1136 }
1137
1138 /*
1139  * Find a running poll command that matches one specified in sqe->addr,
1140  * and remove it if found.
1141  */
1142 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1143 {
1144         struct io_ring_ctx *ctx = req->ctx;
1145         struct io_kiocb *poll_req, *next;
1146         int ret = -ENOENT;
1147
1148         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1149                 return -EINVAL;
1150         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1151             sqe->poll_events)
1152                 return -EINVAL;
1153
1154         spin_lock_irq(&ctx->completion_lock);
1155         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1156                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1157                         io_poll_remove_one(poll_req);
1158                         ret = 0;
1159                         break;
1160                 }
1161         }
1162         spin_unlock_irq(&ctx->completion_lock);
1163
1164         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1165         io_put_req(req);
1166         return 0;
1167 }
1168
1169 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1170                              __poll_t mask)
1171 {
1172         req->poll.done = true;
1173         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask), 0);
1174         io_commit_cqring(ctx);
1175 }
1176
1177 static void io_poll_complete_work(struct work_struct *work)
1178 {
1179         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1180         struct io_poll_iocb *poll = &req->poll;
1181         struct poll_table_struct pt = { ._key = poll->events };
1182         struct io_ring_ctx *ctx = req->ctx;
1183         __poll_t mask = 0;
1184
1185         if (!READ_ONCE(poll->canceled))
1186                 mask = vfs_poll(poll->file, &pt) & poll->events;
1187
1188         /*
1189          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1190          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1191          * synchronize with them.  In the cancellation case the list_del_init
1192          * itself is not actually needed, but harmless so we keep it in to
1193          * avoid further branches in the fast path.
1194          */
1195         spin_lock_irq(&ctx->completion_lock);
1196         if (!mask && !READ_ONCE(poll->canceled)) {
1197                 add_wait_queue(poll->head, &poll->wait);
1198                 spin_unlock_irq(&ctx->completion_lock);
1199                 return;
1200         }
1201         list_del_init(&req->list);
1202         io_poll_complete(ctx, req, mask);
1203         spin_unlock_irq(&ctx->completion_lock);
1204
1205         io_cqring_ev_posted(ctx);
1206         io_put_req(req);
1207 }
1208
1209 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1210                         void *key)
1211 {
1212         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1213                                                         wait);
1214         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1215         struct io_ring_ctx *ctx = req->ctx;
1216         __poll_t mask = key_to_poll(key);
1217         unsigned long flags;
1218
1219         /* for instances that support it check for an event match first: */
1220         if (mask && !(mask & poll->events))
1221                 return 0;
1222
1223         list_del_init(&poll->wait.entry);
1224
1225         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1226                 list_del(&req->list);
1227                 io_poll_complete(ctx, req, mask);
1228                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1229
1230                 io_cqring_ev_posted(ctx);
1231                 io_put_req(req);
1232         } else {
1233                 queue_work(ctx->sqo_wq, &req->work);
1234         }
1235
1236         return 1;
1237 }
1238
1239 struct io_poll_table {
1240         struct poll_table_struct pt;
1241         struct io_kiocb *req;
1242         int error;
1243 };
1244
1245 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1246                                struct poll_table_struct *p)
1247 {
1248         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1249
1250         if (unlikely(pt->req->poll.head)) {
1251                 pt->error = -EINVAL;
1252                 return;
1253         }
1254
1255         pt->error = 0;
1256         pt->req->poll.head = head;
1257         add_wait_queue(head, &pt->req->poll.wait);
1258 }
1259
1260 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1261 {
1262         struct io_poll_iocb *poll = &req->poll;
1263         struct io_ring_ctx *ctx = req->ctx;
1264         struct io_poll_table ipt;
1265         bool cancel = false;
1266         __poll_t mask;
1267         u16 events;
1268
1269         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1270                 return -EINVAL;
1271         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1272                 return -EINVAL;
1273         if (!poll->file)
1274                 return -EBADF;
1275
1276         INIT_WORK(&req->work, io_poll_complete_work);
1277         events = READ_ONCE(sqe->poll_events);
1278         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1279
1280         poll->head = NULL;
1281         poll->done = false;
1282         poll->canceled = false;
1283
1284         ipt.pt._qproc = io_poll_queue_proc;
1285         ipt.pt._key = poll->events;
1286         ipt.req = req;
1287         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1288
1289         /* initialized the list so that we can do list_empty checks */
1290         INIT_LIST_HEAD(&poll->wait.entry);
1291         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1292
1293         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1294
1295         spin_lock_irq(&ctx->completion_lock);
1296         if (likely(poll->head)) {
1297                 spin_lock(&poll->head->lock);
1298                 if (unlikely(list_empty(&poll->wait.entry))) {
1299                         if (ipt.error)
1300                                 cancel = true;
1301                         ipt.error = 0;
1302                         mask = 0;
1303                 }
1304                 if (mask || ipt.error)
1305                         list_del_init(&poll->wait.entry);
1306                 else if (cancel)
1307                         WRITE_ONCE(poll->canceled, true);
1308                 else if (!poll->done) /* actually waiting for an event */
1309                         list_add_tail(&req->list, &ctx->cancel_list);
1310                 spin_unlock(&poll->head->lock);
1311         }
1312         if (mask) { /* no async, we'd stolen it */
1313                 req->error = mangle_poll(mask);
1314                 ipt.error = 0;
1315                 io_poll_complete(ctx, req, mask);
1316         }
1317         spin_unlock_irq(&ctx->completion_lock);
1318
1319         if (mask) {
1320                 io_cqring_ev_posted(ctx);
1321                 io_put_req(req);
1322         }
1323         return ipt.error;
1324 }
1325
1326 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1327                            const struct sqe_submit *s, bool force_nonblock,
1328                            struct io_submit_state *state)
1329 {
1330         int ret, opcode;
1331
1332         if (unlikely(s->index >= ctx->sq_entries))
1333                 return -EINVAL;
1334         req->user_data = READ_ONCE(s->sqe->user_data);
1335
1336         opcode = READ_ONCE(s->sqe->opcode);
1337         switch (opcode) {
1338         case IORING_OP_NOP:
1339                 ret = io_nop(req, req->user_data);
1340                 break;
1341         case IORING_OP_READV:
1342                 if (unlikely(s->sqe->buf_index))
1343                         return -EINVAL;
1344                 ret = io_read(req, s, force_nonblock, state);
1345                 break;
1346         case IORING_OP_WRITEV:
1347                 if (unlikely(s->sqe->buf_index))
1348                         return -EINVAL;
1349                 ret = io_write(req, s, force_nonblock, state);
1350                 break;
1351         case IORING_OP_READ_FIXED:
1352                 ret = io_read(req, s, force_nonblock, state);
1353                 break;
1354         case IORING_OP_WRITE_FIXED:
1355                 ret = io_write(req, s, force_nonblock, state);
1356                 break;
1357         case IORING_OP_FSYNC:
1358                 ret = io_fsync(req, s->sqe, force_nonblock);
1359                 break;
1360         case IORING_OP_POLL_ADD:
1361                 ret = io_poll_add(req, s->sqe);
1362                 break;
1363         case IORING_OP_POLL_REMOVE:
1364                 ret = io_poll_remove(req, s->sqe);
1365                 break;
1366         default:
1367                 ret = -EINVAL;
1368                 break;
1369         }
1370
1371         if (ret)
1372                 return ret;
1373
1374         if (ctx->flags & IORING_SETUP_IOPOLL) {
1375                 if (req->error == -EAGAIN)
1376                         return -EAGAIN;
1377
1378                 /* workqueue context doesn't hold uring_lock, grab it now */
1379                 if (s->needs_lock)
1380                         mutex_lock(&ctx->uring_lock);
1381                 io_iopoll_req_issued(req);
1382                 if (s->needs_lock)
1383                         mutex_unlock(&ctx->uring_lock);
1384         }
1385
1386         return 0;
1387 }
1388
1389 static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1390                                                  const struct io_uring_sqe *sqe)
1391 {
1392         switch (sqe->opcode) {
1393         case IORING_OP_READV:
1394         case IORING_OP_READ_FIXED:
1395                 return &ctx->pending_async[READ];
1396         case IORING_OP_WRITEV:
1397         case IORING_OP_WRITE_FIXED:
1398                 return &ctx->pending_async[WRITE];
1399         default:
1400                 return NULL;
1401         }
1402 }
1403
1404 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1405 {
1406         u8 opcode = READ_ONCE(sqe->opcode);
1407
1408         return !(opcode == IORING_OP_READ_FIXED ||
1409                  opcode == IORING_OP_WRITE_FIXED);
1410 }
1411
1412 static void io_sq_wq_submit_work(struct work_struct *work)
1413 {
1414         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1415         struct io_ring_ctx *ctx = req->ctx;
1416         struct mm_struct *cur_mm = NULL;
1417         struct async_list *async_list;
1418         LIST_HEAD(req_list);
1419         mm_segment_t old_fs;
1420         int ret;
1421
1422         async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1423 restart:
1424         do {
1425                 struct sqe_submit *s = &req->submit;
1426                 const struct io_uring_sqe *sqe = s->sqe;
1427
1428                 /* Ensure we clear previously set forced non-block flag */
1429                 req->flags &= ~REQ_F_FORCE_NONBLOCK;
1430                 req->rw.ki_flags &= ~IOCB_NOWAIT;
1431
1432                 ret = 0;
1433                 if (io_sqe_needs_user(sqe) && !cur_mm) {
1434                         if (!mmget_not_zero(ctx->sqo_mm)) {
1435                                 ret = -EFAULT;
1436                         } else {
1437                                 cur_mm = ctx->sqo_mm;
1438                                 use_mm(cur_mm);
1439                                 old_fs = get_fs();
1440                                 set_fs(USER_DS);
1441                         }
1442                 }
1443
1444                 if (!ret) {
1445                         s->has_user = cur_mm != NULL;
1446                         s->needs_lock = true;
1447                         do {
1448                                 ret = __io_submit_sqe(ctx, req, s, false, NULL);
1449                                 /*
1450                                  * We can get EAGAIN for polled IO even though
1451                                  * we're forcing a sync submission from here,
1452                                  * since we can't wait for request slots on the
1453                                  * block side.
1454                                  */
1455                                 if (ret != -EAGAIN)
1456                                         break;
1457                                 cond_resched();
1458                         } while (1);
1459
1460                         /* drop submission reference */
1461                         io_put_req(req);
1462                 }
1463                 if (ret) {
1464                         io_cqring_add_event(ctx, sqe->user_data, ret, 0);
1465                         io_put_req(req);
1466                 }
1467
1468                 /* async context always use a copy of the sqe */
1469                 kfree(sqe);
1470
1471                 if (!async_list)
1472                         break;
1473                 if (!list_empty(&req_list)) {
1474                         req = list_first_entry(&req_list, struct io_kiocb,
1475                                                 list);
1476                         list_del(&req->list);
1477                         continue;
1478                 }
1479                 if (list_empty(&async_list->list))
1480                         break;
1481
1482                 req = NULL;
1483                 spin_lock(&async_list->lock);
1484                 if (list_empty(&async_list->list)) {
1485                         spin_unlock(&async_list->lock);
1486                         break;
1487                 }
1488                 list_splice_init(&async_list->list, &req_list);
1489                 spin_unlock(&async_list->lock);
1490
1491                 req = list_first_entry(&req_list, struct io_kiocb, list);
1492                 list_del(&req->list);
1493         } while (req);
1494
1495         /*
1496          * Rare case of racing with a submitter. If we find the count has
1497          * dropped to zero AND we have pending work items, then restart
1498          * the processing. This is a tiny race window.
1499          */
1500         if (async_list) {
1501                 ret = atomic_dec_return(&async_list->cnt);
1502                 while (!ret && !list_empty(&async_list->list)) {
1503                         spin_lock(&async_list->lock);
1504                         atomic_inc(&async_list->cnt);
1505                         list_splice_init(&async_list->list, &req_list);
1506                         spin_unlock(&async_list->lock);
1507
1508                         if (!list_empty(&req_list)) {
1509                                 req = list_first_entry(&req_list,
1510                                                         struct io_kiocb, list);
1511                                 list_del(&req->list);
1512                                 goto restart;
1513                         }
1514                         ret = atomic_dec_return(&async_list->cnt);
1515                 }
1516         }
1517
1518         if (cur_mm) {
1519                 set_fs(old_fs);
1520                 unuse_mm(cur_mm);
1521                 mmput(cur_mm);
1522         }
1523 }
1524
1525 /*
1526  * See if we can piggy back onto previously submitted work, that is still
1527  * running. We currently only allow this if the new request is sequential
1528  * to the previous one we punted.
1529  */
1530 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1531 {
1532         bool ret = false;
1533
1534         if (!list)
1535                 return false;
1536         if (!(req->flags & REQ_F_SEQ_PREV))
1537                 return false;
1538         if (!atomic_read(&list->cnt))
1539                 return false;
1540
1541         ret = true;
1542         spin_lock(&list->lock);
1543         list_add_tail(&req->list, &list->list);
1544         if (!atomic_read(&list->cnt)) {
1545                 list_del_init(&req->list);
1546                 ret = false;
1547         }
1548         spin_unlock(&list->lock);
1549         return ret;
1550 }
1551
1552 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1553 {
1554         int op = READ_ONCE(sqe->opcode);
1555
1556         switch (op) {
1557         case IORING_OP_NOP:
1558         case IORING_OP_POLL_REMOVE:
1559                 return false;
1560         default:
1561                 return true;
1562         }
1563 }
1564
1565 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1566                            struct io_submit_state *state, struct io_kiocb *req)
1567 {
1568         unsigned flags;
1569         int fd;
1570
1571         flags = READ_ONCE(s->sqe->flags);
1572         fd = READ_ONCE(s->sqe->fd);
1573
1574         if (!io_op_needs_file(s->sqe)) {
1575                 req->file = NULL;
1576                 return 0;
1577         }
1578
1579         if (flags & IOSQE_FIXED_FILE) {
1580                 if (unlikely(!ctx->user_files ||
1581                     (unsigned) fd >= ctx->nr_user_files))
1582                         return -EBADF;
1583                 req->file = ctx->user_files[fd];
1584                 req->flags |= REQ_F_FIXED_FILE;
1585         } else {
1586                 if (s->needs_fixed_file)
1587                         return -EBADF;
1588                 req->file = io_file_get(state, fd);
1589                 if (unlikely(!req->file))
1590                         return -EBADF;
1591         }
1592
1593         return 0;
1594 }
1595
1596 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1597                          struct io_submit_state *state)
1598 {
1599         struct io_kiocb *req;
1600         int ret;
1601
1602         /* enforce forwards compatibility on users */
1603         if (unlikely(s->sqe->flags & ~IOSQE_FIXED_FILE))
1604                 return -EINVAL;
1605
1606         req = io_get_req(ctx, state);
1607         if (unlikely(!req))
1608                 return -EAGAIN;
1609
1610         ret = io_req_set_file(ctx, s, state, req);
1611         if (unlikely(ret))
1612                 goto out;
1613
1614         ret = __io_submit_sqe(ctx, req, s, true, state);
1615         if (ret == -EAGAIN) {
1616                 struct io_uring_sqe *sqe_copy;
1617
1618                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1619                 if (sqe_copy) {
1620                         struct async_list *list;
1621
1622                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1623                         s->sqe = sqe_copy;
1624
1625                         memcpy(&req->submit, s, sizeof(*s));
1626                         list = io_async_list_from_sqe(ctx, s->sqe);
1627                         if (!io_add_to_prev_work(list, req)) {
1628                                 if (list)
1629                                         atomic_inc(&list->cnt);
1630                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
1631                                 queue_work(ctx->sqo_wq, &req->work);
1632                         }
1633
1634                         /*
1635                          * Queued up for async execution, worker will release
1636                          * submit reference when the iocb is actually
1637                          * submitted.
1638                          */
1639                         return 0;
1640                 }
1641         }
1642
1643 out:
1644         /* drop submission reference */
1645         io_put_req(req);
1646
1647         /* and drop final reference, if we failed */
1648         if (ret)
1649                 io_put_req(req);
1650
1651         return ret;
1652 }
1653
1654 /*
1655  * Batched submission is done, ensure local IO is flushed out.
1656  */
1657 static void io_submit_state_end(struct io_submit_state *state)
1658 {
1659         blk_finish_plug(&state->plug);
1660         io_file_put(state, NULL);
1661         if (state->free_reqs)
1662                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1663                                         &state->reqs[state->cur_req]);
1664 }
1665
1666 /*
1667  * Start submission side cache.
1668  */
1669 static void io_submit_state_start(struct io_submit_state *state,
1670                                   struct io_ring_ctx *ctx, unsigned max_ios)
1671 {
1672         blk_start_plug(&state->plug);
1673         state->free_reqs = 0;
1674         state->file = NULL;
1675         state->ios_left = max_ios;
1676 }
1677
1678 static void io_commit_sqring(struct io_ring_ctx *ctx)
1679 {
1680         struct io_sq_ring *ring = ctx->sq_ring;
1681
1682         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1683                 /*
1684                  * Ensure any loads from the SQEs are done at this point,
1685                  * since once we write the new head, the application could
1686                  * write new data to them.
1687                  */
1688                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1689
1690                 /*
1691                  * write side barrier of head update, app has read side. See
1692                  * comment at the top of this file
1693                  */
1694                 smp_wmb();
1695         }
1696 }
1697
1698 /*
1699  * Undo last io_get_sqring()
1700  */
1701 static void io_drop_sqring(struct io_ring_ctx *ctx)
1702 {
1703         ctx->cached_sq_head--;
1704 }
1705
1706 /*
1707  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1708  * that is mapped by userspace. This means that care needs to be taken to
1709  * ensure that reads are stable, as we cannot rely on userspace always
1710  * being a good citizen. If members of the sqe are validated and then later
1711  * used, it's important that those reads are done through READ_ONCE() to
1712  * prevent a re-load down the line.
1713  */
1714 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1715 {
1716         struct io_sq_ring *ring = ctx->sq_ring;
1717         unsigned head;
1718
1719         /*
1720          * The cached sq head (or cq tail) serves two purposes:
1721          *
1722          * 1) allows us to batch the cost of updating the user visible
1723          *    head updates.
1724          * 2) allows the kernel side to track the head on its own, even
1725          *    though the application is the one updating it.
1726          */
1727         head = ctx->cached_sq_head;
1728         /* See comment at the top of this file */
1729         smp_rmb();
1730         if (head == READ_ONCE(ring->r.tail))
1731                 return false;
1732
1733         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1734         if (head < ctx->sq_entries) {
1735                 s->index = head;
1736                 s->sqe = &ctx->sq_sqes[head];
1737                 ctx->cached_sq_head++;
1738                 return true;
1739         }
1740
1741         /* drop invalid entries */
1742         ctx->cached_sq_head++;
1743         ring->dropped++;
1744         /* See comment at the top of this file */
1745         smp_wmb();
1746         return false;
1747 }
1748
1749 static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1750                           unsigned int nr, bool has_user, bool mm_fault)
1751 {
1752         struct io_submit_state state, *statep = NULL;
1753         int ret, i, submitted = 0;
1754
1755         if (nr > IO_PLUG_THRESHOLD) {
1756                 io_submit_state_start(&state, ctx, nr);
1757                 statep = &state;
1758         }
1759
1760         for (i = 0; i < nr; i++) {
1761                 if (unlikely(mm_fault)) {
1762                         ret = -EFAULT;
1763                 } else {
1764                         sqes[i].has_user = has_user;
1765                         sqes[i].needs_lock = true;
1766                         sqes[i].needs_fixed_file = true;
1767                         ret = io_submit_sqe(ctx, &sqes[i], statep);
1768                 }
1769                 if (!ret) {
1770                         submitted++;
1771                         continue;
1772                 }
1773
1774                 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
1775         }
1776
1777         if (statep)
1778                 io_submit_state_end(&state);
1779
1780         return submitted;
1781 }
1782
1783 static int io_sq_thread(void *data)
1784 {
1785         struct sqe_submit sqes[IO_IOPOLL_BATCH];
1786         struct io_ring_ctx *ctx = data;
1787         struct mm_struct *cur_mm = NULL;
1788         mm_segment_t old_fs;
1789         DEFINE_WAIT(wait);
1790         unsigned inflight;
1791         unsigned long timeout;
1792
1793         old_fs = get_fs();
1794         set_fs(USER_DS);
1795
1796         timeout = inflight = 0;
1797         while (!kthread_should_stop() && !ctx->sqo_stop) {
1798                 bool all_fixed, mm_fault = false;
1799                 int i;
1800
1801                 if (inflight) {
1802                         unsigned nr_events = 0;
1803
1804                         if (ctx->flags & IORING_SETUP_IOPOLL) {
1805                                 /*
1806                                  * We disallow the app entering submit/complete
1807                                  * with polling, but we still need to lock the
1808                                  * ring to prevent racing with polled issue
1809                                  * that got punted to a workqueue.
1810                                  */
1811                                 mutex_lock(&ctx->uring_lock);
1812                                 io_iopoll_check(ctx, &nr_events, 0);
1813                                 mutex_unlock(&ctx->uring_lock);
1814                         } else {
1815                                 /*
1816                                  * Normal IO, just pretend everything completed.
1817                                  * We don't have to poll completions for that.
1818                                  */
1819                                 nr_events = inflight;
1820                         }
1821
1822                         inflight -= nr_events;
1823                         if (!inflight)
1824                                 timeout = jiffies + ctx->sq_thread_idle;
1825                 }
1826
1827                 if (!io_get_sqring(ctx, &sqes[0])) {
1828                         /*
1829                          * We're polling. If we're within the defined idle
1830                          * period, then let us spin without work before going
1831                          * to sleep.
1832                          */
1833                         if (inflight || !time_after(jiffies, timeout)) {
1834                                 cpu_relax();
1835                                 continue;
1836                         }
1837
1838                         /*
1839                          * Drop cur_mm before scheduling, we can't hold it for
1840                          * long periods (or over schedule()). Do this before
1841                          * adding ourselves to the waitqueue, as the unuse/drop
1842                          * may sleep.
1843                          */
1844                         if (cur_mm) {
1845                                 unuse_mm(cur_mm);
1846                                 mmput(cur_mm);
1847                                 cur_mm = NULL;
1848                         }
1849
1850                         prepare_to_wait(&ctx->sqo_wait, &wait,
1851                                                 TASK_INTERRUPTIBLE);
1852
1853                         /* Tell userspace we may need a wakeup call */
1854                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
1855                         smp_wmb();
1856
1857                         if (!io_get_sqring(ctx, &sqes[0])) {
1858                                 if (kthread_should_stop()) {
1859                                         finish_wait(&ctx->sqo_wait, &wait);
1860                                         break;
1861                                 }
1862                                 if (signal_pending(current))
1863                                         flush_signals(current);
1864                                 schedule();
1865                                 finish_wait(&ctx->sqo_wait, &wait);
1866
1867                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1868                                 smp_wmb();
1869                                 continue;
1870                         }
1871                         finish_wait(&ctx->sqo_wait, &wait);
1872
1873                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1874                         smp_wmb();
1875                 }
1876
1877                 i = 0;
1878                 all_fixed = true;
1879                 do {
1880                         if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
1881                                 all_fixed = false;
1882
1883                         i++;
1884                         if (i == ARRAY_SIZE(sqes))
1885                                 break;
1886                 } while (io_get_sqring(ctx, &sqes[i]));
1887
1888                 /* Unless all new commands are FIXED regions, grab mm */
1889                 if (!all_fixed && !cur_mm) {
1890                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
1891                         if (!mm_fault) {
1892                                 use_mm(ctx->sqo_mm);
1893                                 cur_mm = ctx->sqo_mm;
1894                         }
1895                 }
1896
1897                 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
1898                                                 mm_fault);
1899
1900                 /* Commit SQ ring head once we've consumed all SQEs */
1901                 io_commit_sqring(ctx);
1902         }
1903
1904         set_fs(old_fs);
1905         if (cur_mm) {
1906                 unuse_mm(cur_mm);
1907                 mmput(cur_mm);
1908         }
1909         return 0;
1910 }
1911
1912 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
1913 {
1914         struct io_submit_state state, *statep = NULL;
1915         int i, ret = 0, submit = 0;
1916
1917         if (to_submit > IO_PLUG_THRESHOLD) {
1918                 io_submit_state_start(&state, ctx, to_submit);
1919                 statep = &state;
1920         }
1921
1922         for (i = 0; i < to_submit; i++) {
1923                 struct sqe_submit s;
1924
1925                 if (!io_get_sqring(ctx, &s))
1926                         break;
1927
1928                 s.has_user = true;
1929                 s.needs_lock = false;
1930                 s.needs_fixed_file = false;
1931
1932                 ret = io_submit_sqe(ctx, &s, statep);
1933                 if (ret) {
1934                         io_drop_sqring(ctx);
1935                         break;
1936                 }
1937
1938                 submit++;
1939         }
1940         io_commit_sqring(ctx);
1941
1942         if (statep)
1943                 io_submit_state_end(statep);
1944
1945         return submit ? submit : ret;
1946 }
1947
1948 static unsigned io_cqring_events(struct io_cq_ring *ring)
1949 {
1950         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
1951 }
1952
1953 /*
1954  * Wait until events become available, if we don't already have some. The
1955  * application must reap them itself, as they reside on the shared cq ring.
1956  */
1957 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
1958                           const sigset_t __user *sig, size_t sigsz)
1959 {
1960         struct io_cq_ring *ring = ctx->cq_ring;
1961         sigset_t ksigmask, sigsaved;
1962         DEFINE_WAIT(wait);
1963         int ret;
1964
1965         /* See comment at the top of this file */
1966         smp_rmb();
1967         if (io_cqring_events(ring) >= min_events)
1968                 return 0;
1969
1970         if (sig) {
1971                 ret = set_user_sigmask(sig, &ksigmask, &sigsaved, sigsz);
1972                 if (ret)
1973                         return ret;
1974         }
1975
1976         do {
1977                 prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
1978
1979                 ret = 0;
1980                 /* See comment at the top of this file */
1981                 smp_rmb();
1982                 if (io_cqring_events(ring) >= min_events)
1983                         break;
1984
1985                 schedule();
1986
1987                 ret = -EINTR;
1988                 if (signal_pending(current))
1989                         break;
1990         } while (1);
1991
1992         finish_wait(&ctx->wait, &wait);
1993
1994         if (sig)
1995                 restore_user_sigmask(sig, &sigsaved);
1996
1997         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
1998 }
1999
2000 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2001 {
2002 #if defined(CONFIG_UNIX)
2003         if (ctx->ring_sock) {
2004                 struct sock *sock = ctx->ring_sock->sk;
2005                 struct sk_buff *skb;
2006
2007                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2008                         kfree_skb(skb);
2009         }
2010 #else
2011         int i;
2012
2013         for (i = 0; i < ctx->nr_user_files; i++)
2014                 fput(ctx->user_files[i]);
2015 #endif
2016 }
2017
2018 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2019 {
2020         if (!ctx->user_files)
2021                 return -ENXIO;
2022
2023         __io_sqe_files_unregister(ctx);
2024         kfree(ctx->user_files);
2025         ctx->user_files = NULL;
2026         ctx->nr_user_files = 0;
2027         return 0;
2028 }
2029
2030 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2031 {
2032         if (ctx->sqo_thread) {
2033                 ctx->sqo_stop = 1;
2034                 mb();
2035                 kthread_stop(ctx->sqo_thread);
2036                 ctx->sqo_thread = NULL;
2037         }
2038 }
2039
2040 static void io_finish_async(struct io_ring_ctx *ctx)
2041 {
2042         io_sq_thread_stop(ctx);
2043
2044         if (ctx->sqo_wq) {
2045                 destroy_workqueue(ctx->sqo_wq);
2046                 ctx->sqo_wq = NULL;
2047         }
2048 }
2049
2050 #if defined(CONFIG_UNIX)
2051 static void io_destruct_skb(struct sk_buff *skb)
2052 {
2053         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2054
2055         io_finish_async(ctx);
2056         unix_destruct_scm(skb);
2057 }
2058
2059 /*
2060  * Ensure the UNIX gc is aware of our file set, so we are certain that
2061  * the io_uring can be safely unregistered on process exit, even if we have
2062  * loops in the file referencing.
2063  */
2064 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2065 {
2066         struct sock *sk = ctx->ring_sock->sk;
2067         struct scm_fp_list *fpl;
2068         struct sk_buff *skb;
2069         int i;
2070
2071         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2072                 unsigned long inflight = ctx->user->unix_inflight + nr;
2073
2074                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2075                         return -EMFILE;
2076         }
2077
2078         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2079         if (!fpl)
2080                 return -ENOMEM;
2081
2082         skb = alloc_skb(0, GFP_KERNEL);
2083         if (!skb) {
2084                 kfree(fpl);
2085                 return -ENOMEM;
2086         }
2087
2088         skb->sk = sk;
2089         skb->destructor = io_destruct_skb;
2090
2091         fpl->user = get_uid(ctx->user);
2092         for (i = 0; i < nr; i++) {
2093                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2094                 unix_inflight(fpl->user, fpl->fp[i]);
2095         }
2096
2097         fpl->max = fpl->count = nr;
2098         UNIXCB(skb).fp = fpl;
2099         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2100         skb_queue_head(&sk->sk_receive_queue, skb);
2101
2102         for (i = 0; i < nr; i++)
2103                 fput(fpl->fp[i]);
2104
2105         return 0;
2106 }
2107
2108 /*
2109  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2110  * causes regular reference counting to break down. We rely on the UNIX
2111  * garbage collection to take care of this problem for us.
2112  */
2113 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2114 {
2115         unsigned left, total;
2116         int ret = 0;
2117
2118         total = 0;
2119         left = ctx->nr_user_files;
2120         while (left) {
2121                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2122                 int ret;
2123
2124                 ret = __io_sqe_files_scm(ctx, this_files, total);
2125                 if (ret)
2126                         break;
2127                 left -= this_files;
2128                 total += this_files;
2129         }
2130
2131         if (!ret)
2132                 return 0;
2133
2134         while (total < ctx->nr_user_files) {
2135                 fput(ctx->user_files[total]);
2136                 total++;
2137         }
2138
2139         return ret;
2140 }
2141 #else
2142 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2143 {
2144         return 0;
2145 }
2146 #endif
2147
2148 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2149                                  unsigned nr_args)
2150 {
2151         __s32 __user *fds = (__s32 __user *) arg;
2152         int fd, ret = 0;
2153         unsigned i;
2154
2155         if (ctx->user_files)
2156                 return -EBUSY;
2157         if (!nr_args)
2158                 return -EINVAL;
2159         if (nr_args > IORING_MAX_FIXED_FILES)
2160                 return -EMFILE;
2161
2162         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2163         if (!ctx->user_files)
2164                 return -ENOMEM;
2165
2166         for (i = 0; i < nr_args; i++) {
2167                 ret = -EFAULT;
2168                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2169                         break;
2170
2171                 ctx->user_files[i] = fget(fd);
2172
2173                 ret = -EBADF;
2174                 if (!ctx->user_files[i])
2175                         break;
2176                 /*
2177                  * Don't allow io_uring instances to be registered. If UNIX
2178                  * isn't enabled, then this causes a reference cycle and this
2179                  * instance can never get freed. If UNIX is enabled we'll
2180                  * handle it just fine, but there's still no point in allowing
2181                  * a ring fd as it doesn't support regular read/write anyway.
2182                  */
2183                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2184                         fput(ctx->user_files[i]);
2185                         break;
2186                 }
2187                 ctx->nr_user_files++;
2188                 ret = 0;
2189         }
2190
2191         if (ret) {
2192                 for (i = 0; i < ctx->nr_user_files; i++)
2193                         fput(ctx->user_files[i]);
2194
2195                 kfree(ctx->user_files);
2196                 ctx->nr_user_files = 0;
2197                 return ret;
2198         }
2199
2200         ret = io_sqe_files_scm(ctx);
2201         if (ret)
2202                 io_sqe_files_unregister(ctx);
2203
2204         return ret;
2205 }
2206
2207 static int io_sq_offload_start(struct io_ring_ctx *ctx,
2208                                struct io_uring_params *p)
2209 {
2210         int ret;
2211
2212         init_waitqueue_head(&ctx->sqo_wait);
2213         mmgrab(current->mm);
2214         ctx->sqo_mm = current->mm;
2215
2216         ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2217         if (!ctx->sq_thread_idle)
2218                 ctx->sq_thread_idle = HZ;
2219
2220         ret = -EINVAL;
2221         if (!cpu_possible(p->sq_thread_cpu))
2222                 goto err;
2223
2224         if (ctx->flags & IORING_SETUP_SQPOLL) {
2225                 if (p->flags & IORING_SETUP_SQ_AFF) {
2226                         int cpu;
2227
2228                         cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
2229                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2230                                                         ctx, cpu,
2231                                                         "io_uring-sq");
2232                 } else {
2233                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2234                                                         "io_uring-sq");
2235                 }
2236                 if (IS_ERR(ctx->sqo_thread)) {
2237                         ret = PTR_ERR(ctx->sqo_thread);
2238                         ctx->sqo_thread = NULL;
2239                         goto err;
2240                 }
2241                 wake_up_process(ctx->sqo_thread);
2242         } else if (p->flags & IORING_SETUP_SQ_AFF) {
2243                 /* Can't have SQ_AFF without SQPOLL */
2244                 ret = -EINVAL;
2245                 goto err;
2246         }
2247
2248         /* Do QD, or 2 * CPUS, whatever is smallest */
2249         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2250                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2251         if (!ctx->sqo_wq) {
2252                 ret = -ENOMEM;
2253                 goto err;
2254         }
2255
2256         return 0;
2257 err:
2258         io_sq_thread_stop(ctx);
2259         mmdrop(ctx->sqo_mm);
2260         ctx->sqo_mm = NULL;
2261         return ret;
2262 }
2263
2264 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2265 {
2266         atomic_long_sub(nr_pages, &user->locked_vm);
2267 }
2268
2269 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2270 {
2271         unsigned long page_limit, cur_pages, new_pages;
2272
2273         /* Don't allow more pages than we can safely lock */
2274         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2275
2276         do {
2277                 cur_pages = atomic_long_read(&user->locked_vm);
2278                 new_pages = cur_pages + nr_pages;
2279                 if (new_pages > page_limit)
2280                         return -ENOMEM;
2281         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2282                                         new_pages) != cur_pages);
2283
2284         return 0;
2285 }
2286
2287 static void io_mem_free(void *ptr)
2288 {
2289         struct page *page = virt_to_head_page(ptr);
2290
2291         if (put_page_testzero(page))
2292                 free_compound_page(page);
2293 }
2294
2295 static void *io_mem_alloc(size_t size)
2296 {
2297         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2298                                 __GFP_NORETRY;
2299
2300         return (void *) __get_free_pages(gfp_flags, get_order(size));
2301 }
2302
2303 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2304 {
2305         struct io_sq_ring *sq_ring;
2306         struct io_cq_ring *cq_ring;
2307         size_t bytes;
2308
2309         bytes = struct_size(sq_ring, array, sq_entries);
2310         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2311         bytes += struct_size(cq_ring, cqes, cq_entries);
2312
2313         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2314 }
2315
2316 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2317 {
2318         int i, j;
2319
2320         if (!ctx->user_bufs)
2321                 return -ENXIO;
2322
2323         for (i = 0; i < ctx->nr_user_bufs; i++) {
2324                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2325
2326                 for (j = 0; j < imu->nr_bvecs; j++)
2327                         put_page(imu->bvec[j].bv_page);
2328
2329                 if (ctx->account_mem)
2330                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
2331                 kfree(imu->bvec);
2332                 imu->nr_bvecs = 0;
2333         }
2334
2335         kfree(ctx->user_bufs);
2336         ctx->user_bufs = NULL;
2337         ctx->nr_user_bufs = 0;
2338         return 0;
2339 }
2340
2341 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2342                        void __user *arg, unsigned index)
2343 {
2344         struct iovec __user *src;
2345
2346 #ifdef CONFIG_COMPAT
2347         if (ctx->compat) {
2348                 struct compat_iovec __user *ciovs;
2349                 struct compat_iovec ciov;
2350
2351                 ciovs = (struct compat_iovec __user *) arg;
2352                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2353                         return -EFAULT;
2354
2355                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2356                 dst->iov_len = ciov.iov_len;
2357                 return 0;
2358         }
2359 #endif
2360         src = (struct iovec __user *) arg;
2361         if (copy_from_user(dst, &src[index], sizeof(*dst)))
2362                 return -EFAULT;
2363         return 0;
2364 }
2365
2366 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2367                                   unsigned nr_args)
2368 {
2369         struct vm_area_struct **vmas = NULL;
2370         struct page **pages = NULL;
2371         int i, j, got_pages = 0;
2372         int ret = -EINVAL;
2373
2374         if (ctx->user_bufs)
2375                 return -EBUSY;
2376         if (!nr_args || nr_args > UIO_MAXIOV)
2377                 return -EINVAL;
2378
2379         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2380                                         GFP_KERNEL);
2381         if (!ctx->user_bufs)
2382                 return -ENOMEM;
2383
2384         for (i = 0; i < nr_args; i++) {
2385                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2386                 unsigned long off, start, end, ubuf;
2387                 int pret, nr_pages;
2388                 struct iovec iov;
2389                 size_t size;
2390
2391                 ret = io_copy_iov(ctx, &iov, arg, i);
2392                 if (ret)
2393                         break;
2394
2395                 /*
2396                  * Don't impose further limits on the size and buffer
2397                  * constraints here, we'll -EINVAL later when IO is
2398                  * submitted if they are wrong.
2399                  */
2400                 ret = -EFAULT;
2401                 if (!iov.iov_base || !iov.iov_len)
2402                         goto err;
2403
2404                 /* arbitrary limit, but we need something */
2405                 if (iov.iov_len > SZ_1G)
2406                         goto err;
2407
2408                 ubuf = (unsigned long) iov.iov_base;
2409                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2410                 start = ubuf >> PAGE_SHIFT;
2411                 nr_pages = end - start;
2412
2413                 if (ctx->account_mem) {
2414                         ret = io_account_mem(ctx->user, nr_pages);
2415                         if (ret)
2416                                 goto err;
2417                 }
2418
2419                 ret = 0;
2420                 if (!pages || nr_pages > got_pages) {
2421                         kfree(vmas);
2422                         kfree(pages);
2423                         pages = kmalloc_array(nr_pages, sizeof(struct page *),
2424                                                 GFP_KERNEL);
2425                         vmas = kmalloc_array(nr_pages,
2426                                         sizeof(struct vm_area_struct *),
2427                                         GFP_KERNEL);
2428                         if (!pages || !vmas) {
2429                                 ret = -ENOMEM;
2430                                 if (ctx->account_mem)
2431                                         io_unaccount_mem(ctx->user, nr_pages);
2432                                 goto err;
2433                         }
2434                         got_pages = nr_pages;
2435                 }
2436
2437                 imu->bvec = kmalloc_array(nr_pages, sizeof(struct bio_vec),
2438                                                 GFP_KERNEL);
2439                 ret = -ENOMEM;
2440                 if (!imu->bvec) {
2441                         if (ctx->account_mem)
2442                                 io_unaccount_mem(ctx->user, nr_pages);
2443                         goto err;
2444                 }
2445
2446                 ret = 0;
2447                 down_read(&current->mm->mmap_sem);
2448                 pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE,
2449                                                 pages, vmas);
2450                 if (pret == nr_pages) {
2451                         /* don't support file backed memory */
2452                         for (j = 0; j < nr_pages; j++) {
2453                                 struct vm_area_struct *vma = vmas[j];
2454
2455                                 if (vma->vm_file &&
2456                                     !is_file_hugepages(vma->vm_file)) {
2457                                         ret = -EOPNOTSUPP;
2458                                         break;
2459                                 }
2460                         }
2461                 } else {
2462                         ret = pret < 0 ? pret : -EFAULT;
2463                 }
2464                 up_read(&current->mm->mmap_sem);
2465                 if (ret) {
2466                         /*
2467                          * if we did partial map, or found file backed vmas,
2468                          * release any pages we did get
2469                          */
2470                         if (pret > 0) {
2471                                 for (j = 0; j < pret; j++)
2472                                         put_page(pages[j]);
2473                         }
2474                         if (ctx->account_mem)
2475                                 io_unaccount_mem(ctx->user, nr_pages);
2476                         goto err;
2477                 }
2478
2479                 off = ubuf & ~PAGE_MASK;
2480                 size = iov.iov_len;
2481                 for (j = 0; j < nr_pages; j++) {
2482                         size_t vec_len;
2483
2484                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
2485                         imu->bvec[j].bv_page = pages[j];
2486                         imu->bvec[j].bv_len = vec_len;
2487                         imu->bvec[j].bv_offset = off;
2488                         off = 0;
2489                         size -= vec_len;
2490                 }
2491                 /* store original address for later verification */
2492                 imu->ubuf = ubuf;
2493                 imu->len = iov.iov_len;
2494                 imu->nr_bvecs = nr_pages;
2495
2496                 ctx->nr_user_bufs++;
2497         }
2498         kfree(pages);
2499         kfree(vmas);
2500         return 0;
2501 err:
2502         kfree(pages);
2503         kfree(vmas);
2504         io_sqe_buffer_unregister(ctx);
2505         return ret;
2506 }
2507
2508 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2509 {
2510         io_finish_async(ctx);
2511         if (ctx->sqo_mm)
2512                 mmdrop(ctx->sqo_mm);
2513
2514         io_iopoll_reap_events(ctx);
2515         io_sqe_buffer_unregister(ctx);
2516         io_sqe_files_unregister(ctx);
2517
2518 #if defined(CONFIG_UNIX)
2519         if (ctx->ring_sock)
2520                 sock_release(ctx->ring_sock);
2521 #endif
2522
2523         io_mem_free(ctx->sq_ring);
2524         io_mem_free(ctx->sq_sqes);
2525         io_mem_free(ctx->cq_ring);
2526
2527         percpu_ref_exit(&ctx->refs);
2528         if (ctx->account_mem)
2529                 io_unaccount_mem(ctx->user,
2530                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
2531         free_uid(ctx->user);
2532         kfree(ctx);
2533 }
2534
2535 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2536 {
2537         struct io_ring_ctx *ctx = file->private_data;
2538         __poll_t mask = 0;
2539
2540         poll_wait(file, &ctx->cq_wait, wait);
2541         /* See comment at the top of this file */
2542         smp_rmb();
2543         if (READ_ONCE(ctx->sq_ring->r.tail) + 1 != ctx->cached_sq_head)
2544                 mask |= EPOLLOUT | EPOLLWRNORM;
2545         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2546                 mask |= EPOLLIN | EPOLLRDNORM;
2547
2548         return mask;
2549 }
2550
2551 static int io_uring_fasync(int fd, struct file *file, int on)
2552 {
2553         struct io_ring_ctx *ctx = file->private_data;
2554
2555         return fasync_helper(fd, file, on, &ctx->cq_fasync);
2556 }
2557
2558 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2559 {
2560         mutex_lock(&ctx->uring_lock);
2561         percpu_ref_kill(&ctx->refs);
2562         mutex_unlock(&ctx->uring_lock);
2563
2564         io_poll_remove_all(ctx);
2565         io_iopoll_reap_events(ctx);
2566         wait_for_completion(&ctx->ctx_done);
2567         io_ring_ctx_free(ctx);
2568 }
2569
2570 static int io_uring_release(struct inode *inode, struct file *file)
2571 {
2572         struct io_ring_ctx *ctx = file->private_data;
2573
2574         file->private_data = NULL;
2575         io_ring_ctx_wait_and_kill(ctx);
2576         return 0;
2577 }
2578
2579 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2580 {
2581         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2582         unsigned long sz = vma->vm_end - vma->vm_start;
2583         struct io_ring_ctx *ctx = file->private_data;
2584         unsigned long pfn;
2585         struct page *page;
2586         void *ptr;
2587
2588         switch (offset) {
2589         case IORING_OFF_SQ_RING:
2590                 ptr = ctx->sq_ring;
2591                 break;
2592         case IORING_OFF_SQES:
2593                 ptr = ctx->sq_sqes;
2594                 break;
2595         case IORING_OFF_CQ_RING:
2596                 ptr = ctx->cq_ring;
2597                 break;
2598         default:
2599                 return -EINVAL;
2600         }
2601
2602         page = virt_to_head_page(ptr);
2603         if (sz > (PAGE_SIZE << compound_order(page)))
2604                 return -EINVAL;
2605
2606         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2607         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2608 }
2609
2610 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2611                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2612                 size_t, sigsz)
2613 {
2614         struct io_ring_ctx *ctx;
2615         long ret = -EBADF;
2616         int submitted = 0;
2617         struct fd f;
2618
2619         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2620                 return -EINVAL;
2621
2622         f = fdget(fd);
2623         if (!f.file)
2624                 return -EBADF;
2625
2626         ret = -EOPNOTSUPP;
2627         if (f.file->f_op != &io_uring_fops)
2628                 goto out_fput;
2629
2630         ret = -ENXIO;
2631         ctx = f.file->private_data;
2632         if (!percpu_ref_tryget(&ctx->refs))
2633                 goto out_fput;
2634
2635         /*
2636          * For SQ polling, the thread will do all submissions and completions.
2637          * Just return the requested submit count, and wake the thread if
2638          * we were asked to.
2639          */
2640         if (ctx->flags & IORING_SETUP_SQPOLL) {
2641                 if (flags & IORING_ENTER_SQ_WAKEUP)
2642                         wake_up(&ctx->sqo_wait);
2643                 submitted = to_submit;
2644                 goto out_ctx;
2645         }
2646
2647         ret = 0;
2648         if (to_submit) {
2649                 to_submit = min(to_submit, ctx->sq_entries);
2650
2651                 mutex_lock(&ctx->uring_lock);
2652                 submitted = io_ring_submit(ctx, to_submit);
2653                 mutex_unlock(&ctx->uring_lock);
2654
2655                 if (submitted < 0)
2656                         goto out_ctx;
2657         }
2658         if (flags & IORING_ENTER_GETEVENTS) {
2659                 unsigned nr_events = 0;
2660
2661                 min_complete = min(min_complete, ctx->cq_entries);
2662
2663                 /*
2664                  * The application could have included the 'to_submit' count
2665                  * in how many events it wanted to wait for. If we failed to
2666                  * submit the desired count, we may need to adjust the number
2667                  * of events to poll/wait for.
2668                  */
2669                 if (submitted < to_submit)
2670                         min_complete = min_t(unsigned, submitted, min_complete);
2671
2672                 if (ctx->flags & IORING_SETUP_IOPOLL) {
2673                         mutex_lock(&ctx->uring_lock);
2674                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
2675                         mutex_unlock(&ctx->uring_lock);
2676                 } else {
2677                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2678                 }
2679         }
2680
2681 out_ctx:
2682         io_ring_drop_ctx_refs(ctx, 1);
2683 out_fput:
2684         fdput(f);
2685         return submitted ? submitted : ret;
2686 }
2687
2688 static const struct file_operations io_uring_fops = {
2689         .release        = io_uring_release,
2690         .mmap           = io_uring_mmap,
2691         .poll           = io_uring_poll,
2692         .fasync         = io_uring_fasync,
2693 };
2694
2695 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2696                                   struct io_uring_params *p)
2697 {
2698         struct io_sq_ring *sq_ring;
2699         struct io_cq_ring *cq_ring;
2700         size_t size;
2701
2702         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2703         if (!sq_ring)
2704                 return -ENOMEM;
2705
2706         ctx->sq_ring = sq_ring;
2707         sq_ring->ring_mask = p->sq_entries - 1;
2708         sq_ring->ring_entries = p->sq_entries;
2709         ctx->sq_mask = sq_ring->ring_mask;
2710         ctx->sq_entries = sq_ring->ring_entries;
2711
2712         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2713         if (size == SIZE_MAX)
2714                 return -EOVERFLOW;
2715
2716         ctx->sq_sqes = io_mem_alloc(size);
2717         if (!ctx->sq_sqes) {
2718                 io_mem_free(ctx->sq_ring);
2719                 return -ENOMEM;
2720         }
2721
2722         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2723         if (!cq_ring) {
2724                 io_mem_free(ctx->sq_ring);
2725                 io_mem_free(ctx->sq_sqes);
2726                 return -ENOMEM;
2727         }
2728
2729         ctx->cq_ring = cq_ring;
2730         cq_ring->ring_mask = p->cq_entries - 1;
2731         cq_ring->ring_entries = p->cq_entries;
2732         ctx->cq_mask = cq_ring->ring_mask;
2733         ctx->cq_entries = cq_ring->ring_entries;
2734         return 0;
2735 }
2736
2737 /*
2738  * Allocate an anonymous fd, this is what constitutes the application
2739  * visible backing of an io_uring instance. The application mmaps this
2740  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
2741  * we have to tie this fd to a socket for file garbage collection purposes.
2742  */
2743 static int io_uring_get_fd(struct io_ring_ctx *ctx)
2744 {
2745         struct file *file;
2746         int ret;
2747
2748 #if defined(CONFIG_UNIX)
2749         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
2750                                 &ctx->ring_sock);
2751         if (ret)
2752                 return ret;
2753 #endif
2754
2755         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
2756         if (ret < 0)
2757                 goto err;
2758
2759         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
2760                                         O_RDWR | O_CLOEXEC);
2761         if (IS_ERR(file)) {
2762                 put_unused_fd(ret);
2763                 ret = PTR_ERR(file);
2764                 goto err;
2765         }
2766
2767 #if defined(CONFIG_UNIX)
2768         ctx->ring_sock->file = file;
2769         ctx->ring_sock->sk->sk_user_data = ctx;
2770 #endif
2771         fd_install(ret, file);
2772         return ret;
2773 err:
2774 #if defined(CONFIG_UNIX)
2775         sock_release(ctx->ring_sock);
2776         ctx->ring_sock = NULL;
2777 #endif
2778         return ret;
2779 }
2780
2781 static int io_uring_create(unsigned entries, struct io_uring_params *p)
2782 {
2783         struct user_struct *user = NULL;
2784         struct io_ring_ctx *ctx;
2785         bool account_mem;
2786         int ret;
2787
2788         if (!entries || entries > IORING_MAX_ENTRIES)
2789                 return -EINVAL;
2790
2791         /*
2792          * Use twice as many entries for the CQ ring. It's possible for the
2793          * application to drive a higher depth than the size of the SQ ring,
2794          * since the sqes are only used at submission time. This allows for
2795          * some flexibility in overcommitting a bit.
2796          */
2797         p->sq_entries = roundup_pow_of_two(entries);
2798         p->cq_entries = 2 * p->sq_entries;
2799
2800         user = get_uid(current_user());
2801         account_mem = !capable(CAP_IPC_LOCK);
2802
2803         if (account_mem) {
2804                 ret = io_account_mem(user,
2805                                 ring_pages(p->sq_entries, p->cq_entries));
2806                 if (ret) {
2807                         free_uid(user);
2808                         return ret;
2809                 }
2810         }
2811
2812         ctx = io_ring_ctx_alloc(p);
2813         if (!ctx) {
2814                 if (account_mem)
2815                         io_unaccount_mem(user, ring_pages(p->sq_entries,
2816                                                                 p->cq_entries));
2817                 free_uid(user);
2818                 return -ENOMEM;
2819         }
2820         ctx->compat = in_compat_syscall();
2821         ctx->account_mem = account_mem;
2822         ctx->user = user;
2823
2824         ret = io_allocate_scq_urings(ctx, p);
2825         if (ret)
2826                 goto err;
2827
2828         ret = io_sq_offload_start(ctx, p);
2829         if (ret)
2830                 goto err;
2831
2832         ret = io_uring_get_fd(ctx);
2833         if (ret < 0)
2834                 goto err;
2835
2836         memset(&p->sq_off, 0, sizeof(p->sq_off));
2837         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
2838         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
2839         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
2840         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
2841         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
2842         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
2843         p->sq_off.array = offsetof(struct io_sq_ring, array);
2844
2845         memset(&p->cq_off, 0, sizeof(p->cq_off));
2846         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
2847         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
2848         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
2849         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
2850         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
2851         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
2852         return ret;
2853 err:
2854         io_ring_ctx_wait_and_kill(ctx);
2855         return ret;
2856 }
2857
2858 /*
2859  * Sets up an aio uring context, and returns the fd. Applications asks for a
2860  * ring size, we return the actual sq/cq ring sizes (among other things) in the
2861  * params structure passed in.
2862  */
2863 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
2864 {
2865         struct io_uring_params p;
2866         long ret;
2867         int i;
2868
2869         if (copy_from_user(&p, params, sizeof(p)))
2870                 return -EFAULT;
2871         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
2872                 if (p.resv[i])
2873                         return -EINVAL;
2874         }
2875
2876         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
2877                         IORING_SETUP_SQ_AFF))
2878                 return -EINVAL;
2879
2880         ret = io_uring_create(entries, &p);
2881         if (ret < 0)
2882                 return ret;
2883
2884         if (copy_to_user(params, &p, sizeof(p)))
2885                 return -EFAULT;
2886
2887         return ret;
2888 }
2889
2890 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
2891                 struct io_uring_params __user *, params)
2892 {
2893         return io_uring_setup(entries, params);
2894 }
2895
2896 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
2897                                void __user *arg, unsigned nr_args)
2898 {
2899         int ret;
2900
2901         percpu_ref_kill(&ctx->refs);
2902         wait_for_completion(&ctx->ctx_done);
2903
2904         switch (opcode) {
2905         case IORING_REGISTER_BUFFERS:
2906                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
2907                 break;
2908         case IORING_UNREGISTER_BUFFERS:
2909                 ret = -EINVAL;
2910                 if (arg || nr_args)
2911                         break;
2912                 ret = io_sqe_buffer_unregister(ctx);
2913                 break;
2914         case IORING_REGISTER_FILES:
2915                 ret = io_sqe_files_register(ctx, arg, nr_args);
2916                 break;
2917         case IORING_UNREGISTER_FILES:
2918                 ret = -EINVAL;
2919                 if (arg || nr_args)
2920                         break;
2921                 ret = io_sqe_files_unregister(ctx);
2922                 break;
2923         default:
2924                 ret = -EINVAL;
2925                 break;
2926         }
2927
2928         /* bring the ctx back to life */
2929         reinit_completion(&ctx->ctx_done);
2930         percpu_ref_reinit(&ctx->refs);
2931         return ret;
2932 }
2933
2934 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
2935                 void __user *, arg, unsigned int, nr_args)
2936 {
2937         struct io_ring_ctx *ctx;
2938         long ret = -EBADF;
2939         struct fd f;
2940
2941         f = fdget(fd);
2942         if (!f.file)
2943                 return -EBADF;
2944
2945         ret = -EOPNOTSUPP;
2946         if (f.file->f_op != &io_uring_fops)
2947                 goto out_fput;
2948
2949         ctx = f.file->private_data;
2950
2951         mutex_lock(&ctx->uring_lock);
2952         ret = __io_uring_register(ctx, opcode, arg, nr_args);
2953         mutex_unlock(&ctx->uring_lock);
2954 out_fput:
2955         fdput(f);
2956         return ret;
2957 }
2958
2959 static int __init io_uring_init(void)
2960 {
2961         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
2962         return 0;
2963 };
2964 __initcall(io_uring_init);