io_uring: move remaining file table manipulation to filetable.c
[linux-2.6-microblaze.git] / io_uring / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqe (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <net/compat.h>
48 #include <linux/refcount.h>
49 #include <linux/uio.h>
50 #include <linux/bits.h>
51
52 #include <linux/sched/signal.h>
53 #include <linux/fs.h>
54 #include <linux/file.h>
55 #include <linux/fdtable.h>
56 #include <linux/mm.h>
57 #include <linux/mman.h>
58 #include <linux/percpu.h>
59 #include <linux/slab.h>
60 #include <linux/blk-mq.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/highmem.h>
72 #include <linux/fsnotify.h>
73 #include <linux/fadvise.h>
74 #include <linux/eventpoll.h>
75 #include <linux/task_work.h>
76 #include <linux/pagemap.h>
77 #include <linux/io_uring.h>
78 #include <linux/audit.h>
79 #include <linux/security.h>
80
81 #define CREATE_TRACE_POINTS
82 #include <trace/events/io_uring.h>
83
84 #include <uapi/linux/io_uring.h>
85
86 #include "io-wq.h"
87
88 #include "io_uring_types.h"
89 #include "io_uring.h"
90 #include "opdef.h"
91 #include "refs.h"
92 #include "tctx.h"
93 #include "sqpoll.h"
94 #include "fdinfo.h"
95 #include "kbuf.h"
96 #include "rsrc.h"
97
98 #include "xattr.h"
99 #include "nop.h"
100 #include "fs.h"
101 #include "splice.h"
102 #include "sync.h"
103 #include "advise.h"
104 #include "openclose.h"
105 #include "uring_cmd.h"
106 #include "epoll.h"
107 #include "statx.h"
108 #include "net.h"
109 #include "msg_ring.h"
110 #include "timeout.h"
111 #include "poll.h"
112 #include "cancel.h"
113
114 #define IORING_MAX_ENTRIES      32768
115 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
116
117 #define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \
118                                  IORING_REGISTER_LAST + IORING_OP_LAST)
119
120 #define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
121                           IOSQE_IO_HARDLINK | IOSQE_ASYNC)
122
123 #define SQE_VALID_FLAGS (SQE_COMMON_FLAGS | IOSQE_BUFFER_SELECT | \
124                         IOSQE_IO_DRAIN | IOSQE_CQE_SKIP_SUCCESS)
125
126 #define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
127                                 REQ_F_POLLED | REQ_F_INFLIGHT | REQ_F_CREDS | \
128                                 REQ_F_ASYNC_DATA)
129
130 #define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | REQ_F_LINK | REQ_F_HARDLINK |\
131                                  IO_REQ_CLEAN_FLAGS)
132
133 #define IO_TCTX_REFS_CACHE_NR   (1U << 10)
134
135 #define IO_COMPL_BATCH                  32
136 #define IO_REQ_CACHE_SIZE               32
137 #define IO_REQ_ALLOC_BATCH              8
138
139 /*
140  * First field must be the file pointer in all the
141  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
142  */
143 struct io_rw {
144         /* NOTE: kiocb has the file as the first member, so don't do it here */
145         struct kiocb                    kiocb;
146         u64                             addr;
147         u32                             len;
148         rwf_t                           flags;
149 };
150
151 struct io_rw_state {
152         struct iov_iter                 iter;
153         struct iov_iter_state           iter_state;
154         struct iovec                    fast_iov[UIO_FASTIOV];
155 };
156
157 struct io_async_rw {
158         struct io_rw_state              s;
159         const struct iovec              *free_iovec;
160         size_t                          bytes_done;
161         struct wait_page_queue          wpq;
162 };
163
164 enum {
165         IO_CHECK_CQ_OVERFLOW_BIT,
166         IO_CHECK_CQ_DROPPED_BIT,
167 };
168
169 struct io_defer_entry {
170         struct list_head        list;
171         struct io_kiocb         *req;
172         u32                     seq;
173 };
174
175 /* requests with any of those set should undergo io_disarm_next() */
176 #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
177 #define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
178
179 static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
180                                          struct task_struct *task,
181                                          bool cancel_all);
182
183 static void io_dismantle_req(struct io_kiocb *req);
184 static void io_clean_op(struct io_kiocb *req);
185 static void io_queue_sqe(struct io_kiocb *req);
186
187 static void io_req_task_queue(struct io_kiocb *req);
188 static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
189 static int io_req_prep_async(struct io_kiocb *req);
190
191 static void io_eventfd_signal(struct io_ring_ctx *ctx);
192
193 static struct kmem_cache *req_cachep;
194
195 const char *io_uring_get_opcode(u8 opcode)
196 {
197         if (opcode < IORING_OP_LAST)
198                 return io_op_defs[opcode].name;
199         return "INVALID";
200 }
201
202 struct sock *io_uring_get_socket(struct file *file)
203 {
204 #if defined(CONFIG_UNIX)
205         if (io_is_uring_fops(file)) {
206                 struct io_ring_ctx *ctx = file->private_data;
207
208                 return ctx->ring_sock->sk;
209         }
210 #endif
211         return NULL;
212 }
213 EXPORT_SYMBOL(io_uring_get_socket);
214
215 static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
216 {
217         if (!*locked) {
218                 mutex_lock(&ctx->uring_lock);
219                 *locked = true;
220         }
221 }
222
223 static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
224 {
225         if (!wq_list_empty(&ctx->submit_state.compl_reqs))
226                 __io_submit_flush_completions(ctx);
227 }
228
229 static bool io_match_linked(struct io_kiocb *head)
230 {
231         struct io_kiocb *req;
232
233         io_for_each_link(req, head) {
234                 if (req->flags & REQ_F_INFLIGHT)
235                         return true;
236         }
237         return false;
238 }
239
240 /*
241  * As io_match_task() but protected against racing with linked timeouts.
242  * User must not hold timeout_lock.
243  */
244 bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
245                         bool cancel_all)
246 {
247         bool matched;
248
249         if (task && head->task != task)
250                 return false;
251         if (cancel_all)
252                 return true;
253
254         if (head->flags & REQ_F_LINK_TIMEOUT) {
255                 struct io_ring_ctx *ctx = head->ctx;
256
257                 /* protect against races with linked timeouts */
258                 spin_lock_irq(&ctx->timeout_lock);
259                 matched = io_match_linked(head);
260                 spin_unlock_irq(&ctx->timeout_lock);
261         } else {
262                 matched = io_match_linked(head);
263         }
264         return matched;
265 }
266
267 static inline void req_fail_link_node(struct io_kiocb *req, int res)
268 {
269         req_set_fail(req);
270         io_req_set_res(req, res, 0);
271 }
272
273 static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
274 {
275         wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
276 }
277
278 static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
279 {
280         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
281
282         complete(&ctx->ref_comp);
283 }
284
285 static __cold void io_fallback_req_func(struct work_struct *work)
286 {
287         struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
288                                                 fallback_work.work);
289         struct llist_node *node = llist_del_all(&ctx->fallback_llist);
290         struct io_kiocb *req, *tmp;
291         bool locked = false;
292
293         percpu_ref_get(&ctx->refs);
294         llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node)
295                 req->io_task_work.func(req, &locked);
296
297         if (locked) {
298                 io_submit_flush_completions(ctx);
299                 mutex_unlock(&ctx->uring_lock);
300         }
301         percpu_ref_put(&ctx->refs);
302 }
303
304 static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
305 {
306         struct io_ring_ctx *ctx;
307         int hash_bits;
308
309         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
310         if (!ctx)
311                 return NULL;
312
313         xa_init(&ctx->io_bl_xa);
314
315         /*
316          * Use 5 bits less than the max cq entries, that should give us around
317          * 32 entries per hash list if totally full and uniformly spread.
318          */
319         hash_bits = ilog2(p->cq_entries);
320         hash_bits -= 5;
321         if (hash_bits <= 0)
322                 hash_bits = 1;
323         ctx->cancel_hash_bits = hash_bits;
324         ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
325                                         GFP_KERNEL);
326         if (!ctx->cancel_hash)
327                 goto err;
328         __hash_init(ctx->cancel_hash, 1U << hash_bits);
329
330         ctx->dummy_ubuf = kzalloc(sizeof(*ctx->dummy_ubuf), GFP_KERNEL);
331         if (!ctx->dummy_ubuf)
332                 goto err;
333         /* set invalid range, so io_import_fixed() fails meeting it */
334         ctx->dummy_ubuf->ubuf = -1UL;
335
336         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
337                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
338                 goto err;
339
340         ctx->flags = p->flags;
341         init_waitqueue_head(&ctx->sqo_sq_wait);
342         INIT_LIST_HEAD(&ctx->sqd_list);
343         INIT_LIST_HEAD(&ctx->cq_overflow_list);
344         INIT_LIST_HEAD(&ctx->io_buffers_cache);
345         INIT_LIST_HEAD(&ctx->apoll_cache);
346         init_completion(&ctx->ref_comp);
347         xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
348         mutex_init(&ctx->uring_lock);
349         init_waitqueue_head(&ctx->cq_wait);
350         spin_lock_init(&ctx->completion_lock);
351         spin_lock_init(&ctx->timeout_lock);
352         INIT_WQ_LIST(&ctx->iopoll_list);
353         INIT_LIST_HEAD(&ctx->io_buffers_pages);
354         INIT_LIST_HEAD(&ctx->io_buffers_comp);
355         INIT_LIST_HEAD(&ctx->defer_list);
356         INIT_LIST_HEAD(&ctx->timeout_list);
357         INIT_LIST_HEAD(&ctx->ltimeout_list);
358         spin_lock_init(&ctx->rsrc_ref_lock);
359         INIT_LIST_HEAD(&ctx->rsrc_ref_list);
360         INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
361         init_llist_head(&ctx->rsrc_put_llist);
362         INIT_LIST_HEAD(&ctx->tctx_list);
363         ctx->submit_state.free_list.next = NULL;
364         INIT_WQ_LIST(&ctx->locked_free_list);
365         INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
366         INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
367         return ctx;
368 err:
369         kfree(ctx->dummy_ubuf);
370         kfree(ctx->cancel_hash);
371         kfree(ctx->io_bl);
372         xa_destroy(&ctx->io_bl_xa);
373         kfree(ctx);
374         return NULL;
375 }
376
377 static void io_account_cq_overflow(struct io_ring_ctx *ctx)
378 {
379         struct io_rings *r = ctx->rings;
380
381         WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
382         ctx->cq_extra--;
383 }
384
385 static bool req_need_defer(struct io_kiocb *req, u32 seq)
386 {
387         if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
388                 struct io_ring_ctx *ctx = req->ctx;
389
390                 return seq + READ_ONCE(ctx->cq_extra) != ctx->cached_cq_tail;
391         }
392
393         return false;
394 }
395
396 static inline bool io_req_ffs_set(struct io_kiocb *req)
397 {
398         return req->flags & REQ_F_FIXED_FILE;
399 }
400
401 static inline void io_req_track_inflight(struct io_kiocb *req)
402 {
403         if (!(req->flags & REQ_F_INFLIGHT)) {
404                 req->flags |= REQ_F_INFLIGHT;
405                 atomic_inc(&req->task->io_uring->inflight_tracked);
406         }
407 }
408
409 static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
410 {
411         if (WARN_ON_ONCE(!req->link))
412                 return NULL;
413
414         req->flags &= ~REQ_F_ARM_LTIMEOUT;
415         req->flags |= REQ_F_LINK_TIMEOUT;
416
417         /* linked timeouts should have two refs once prep'ed */
418         io_req_set_refcount(req);
419         __io_req_set_refcount(req->link, 2);
420         return req->link;
421 }
422
423 static inline struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
424 {
425         if (likely(!(req->flags & REQ_F_ARM_LTIMEOUT)))
426                 return NULL;
427         return __io_prep_linked_timeout(req);
428 }
429
430 static noinline void __io_arm_ltimeout(struct io_kiocb *req)
431 {
432         io_queue_linked_timeout(__io_prep_linked_timeout(req));
433 }
434
435 static inline void io_arm_ltimeout(struct io_kiocb *req)
436 {
437         if (unlikely(req->flags & REQ_F_ARM_LTIMEOUT))
438                 __io_arm_ltimeout(req);
439 }
440
441 static void io_prep_async_work(struct io_kiocb *req)
442 {
443         const struct io_op_def *def = &io_op_defs[req->opcode];
444         struct io_ring_ctx *ctx = req->ctx;
445
446         if (!(req->flags & REQ_F_CREDS)) {
447                 req->flags |= REQ_F_CREDS;
448                 req->creds = get_current_cred();
449         }
450
451         req->work.list.next = NULL;
452         req->work.flags = 0;
453         req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
454         if (req->flags & REQ_F_FORCE_ASYNC)
455                 req->work.flags |= IO_WQ_WORK_CONCURRENT;
456
457         if (req->flags & REQ_F_ISREG) {
458                 if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL))
459                         io_wq_hash_work(&req->work, file_inode(req->file));
460         } else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
461                 if (def->unbound_nonreg_file)
462                         req->work.flags |= IO_WQ_WORK_UNBOUND;
463         }
464 }
465
466 static void io_prep_async_link(struct io_kiocb *req)
467 {
468         struct io_kiocb *cur;
469
470         if (req->flags & REQ_F_LINK_TIMEOUT) {
471                 struct io_ring_ctx *ctx = req->ctx;
472
473                 spin_lock_irq(&ctx->timeout_lock);
474                 io_for_each_link(cur, req)
475                         io_prep_async_work(cur);
476                 spin_unlock_irq(&ctx->timeout_lock);
477         } else {
478                 io_for_each_link(cur, req)
479                         io_prep_async_work(cur);
480         }
481 }
482
483 static inline void io_req_add_compl_list(struct io_kiocb *req)
484 {
485         struct io_submit_state *state = &req->ctx->submit_state;
486
487         if (!(req->flags & REQ_F_CQE_SKIP))
488                 state->flush_cqes = true;
489         wq_list_add_tail(&req->comp_list, &state->compl_reqs);
490 }
491
492 static void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
493 {
494         struct io_kiocb *link = io_prep_linked_timeout(req);
495         struct io_uring_task *tctx = req->task->io_uring;
496
497         BUG_ON(!tctx);
498         BUG_ON(!tctx->io_wq);
499
500         /* init ->work of the whole link before punting */
501         io_prep_async_link(req);
502
503         /*
504          * Not expected to happen, but if we do have a bug where this _can_
505          * happen, catch it here and ensure the request is marked as
506          * canceled. That will make io-wq go through the usual work cancel
507          * procedure rather than attempt to run this request (or create a new
508          * worker for it).
509          */
510         if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
511                 req->work.flags |= IO_WQ_WORK_CANCEL;
512
513         trace_io_uring_queue_async_work(req->ctx, req, req->cqe.user_data,
514                                         req->opcode, req->flags, &req->work,
515                                         io_wq_is_hashed(&req->work));
516         io_wq_enqueue(tctx->io_wq, &req->work);
517         if (link)
518                 io_queue_linked_timeout(link);
519 }
520
521 static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
522 {
523         while (!list_empty(&ctx->defer_list)) {
524                 struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
525                                                 struct io_defer_entry, list);
526
527                 if (req_need_defer(de->req, de->seq))
528                         break;
529                 list_del_init(&de->list);
530                 io_req_task_queue(de->req);
531                 kfree(de);
532         }
533 }
534
535 static void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
536 {
537         if (ctx->off_timeout_used || ctx->drain_active) {
538                 spin_lock(&ctx->completion_lock);
539                 if (ctx->off_timeout_used)
540                         io_flush_timeouts(ctx);
541                 if (ctx->drain_active)
542                         io_queue_deferred(ctx);
543                 io_commit_cqring(ctx);
544                 spin_unlock(&ctx->completion_lock);
545         }
546         if (ctx->has_evfd)
547                 io_eventfd_signal(ctx);
548 }
549
550 static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
551 {
552         return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
553 }
554
555 /*
556  * writes to the cq entry need to come after reading head; the
557  * control dependency is enough as we're using WRITE_ONCE to
558  * fill the cq entry
559  */
560 static noinline struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
561 {
562         struct io_rings *rings = ctx->rings;
563         unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
564         unsigned int shift = 0;
565         unsigned int free, queued, len;
566
567         if (ctx->flags & IORING_SETUP_CQE32)
568                 shift = 1;
569
570         /* userspace may cheat modifying the tail, be safe and do min */
571         queued = min(__io_cqring_events(ctx), ctx->cq_entries);
572         free = ctx->cq_entries - queued;
573         /* we need a contiguous range, limit based on the current array offset */
574         len = min(free, ctx->cq_entries - off);
575         if (!len)
576                 return NULL;
577
578         ctx->cached_cq_tail++;
579         ctx->cqe_cached = &rings->cqes[off];
580         ctx->cqe_sentinel = ctx->cqe_cached + len;
581         ctx->cqe_cached++;
582         return &rings->cqes[off << shift];
583 }
584
585 static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
586 {
587         if (likely(ctx->cqe_cached < ctx->cqe_sentinel)) {
588                 struct io_uring_cqe *cqe = ctx->cqe_cached;
589
590                 if (ctx->flags & IORING_SETUP_CQE32) {
591                         unsigned int off = ctx->cqe_cached - ctx->rings->cqes;
592
593                         cqe += off;
594                 }
595
596                 ctx->cached_cq_tail++;
597                 ctx->cqe_cached++;
598                 return cqe;
599         }
600
601         return __io_get_cqe(ctx);
602 }
603
604 static void io_eventfd_signal(struct io_ring_ctx *ctx)
605 {
606         struct io_ev_fd *ev_fd;
607
608         rcu_read_lock();
609         /*
610          * rcu_dereference ctx->io_ev_fd once and use it for both for checking
611          * and eventfd_signal
612          */
613         ev_fd = rcu_dereference(ctx->io_ev_fd);
614
615         /*
616          * Check again if ev_fd exists incase an io_eventfd_unregister call
617          * completed between the NULL check of ctx->io_ev_fd at the start of
618          * the function and rcu_read_lock.
619          */
620         if (unlikely(!ev_fd))
621                 goto out;
622         if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
623                 goto out;
624
625         if (!ev_fd->eventfd_async || io_wq_current_is_worker())
626                 eventfd_signal(ev_fd->cq_ev_fd, 1);
627 out:
628         rcu_read_unlock();
629 }
630
631 static inline void io_cqring_wake(struct io_ring_ctx *ctx)
632 {
633         /*
634          * wake_up_all() may seem excessive, but io_wake_function() and
635          * io_should_wake() handle the termination of the loop and only
636          * wake as many waiters as we need to.
637          */
638         if (wq_has_sleeper(&ctx->cq_wait))
639                 wake_up_all(&ctx->cq_wait);
640 }
641
642 /*
643  * This should only get called when at least one event has been posted.
644  * Some applications rely on the eventfd notification count only changing
645  * IFF a new CQE has been added to the CQ ring. There's no depedency on
646  * 1:1 relationship between how many times this function is called (and
647  * hence the eventfd count) and number of CQEs posted to the CQ ring.
648  */
649 void io_cqring_ev_posted(struct io_ring_ctx *ctx)
650 {
651         if (unlikely(ctx->off_timeout_used || ctx->drain_active ||
652                      ctx->has_evfd))
653                 __io_commit_cqring_flush(ctx);
654
655         io_cqring_wake(ctx);
656 }
657
658 static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx)
659 {
660         if (unlikely(ctx->off_timeout_used || ctx->drain_active ||
661                      ctx->has_evfd))
662                 __io_commit_cqring_flush(ctx);
663
664         if (ctx->flags & IORING_SETUP_SQPOLL)
665                 io_cqring_wake(ctx);
666 }
667
668 /* Returns true if there are no backlogged entries after the flush */
669 static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
670 {
671         bool all_flushed, posted;
672         size_t cqe_size = sizeof(struct io_uring_cqe);
673
674         if (!force && __io_cqring_events(ctx) == ctx->cq_entries)
675                 return false;
676
677         if (ctx->flags & IORING_SETUP_CQE32)
678                 cqe_size <<= 1;
679
680         posted = false;
681         spin_lock(&ctx->completion_lock);
682         while (!list_empty(&ctx->cq_overflow_list)) {
683                 struct io_uring_cqe *cqe = io_get_cqe(ctx);
684                 struct io_overflow_cqe *ocqe;
685
686                 if (!cqe && !force)
687                         break;
688                 ocqe = list_first_entry(&ctx->cq_overflow_list,
689                                         struct io_overflow_cqe, list);
690                 if (cqe)
691                         memcpy(cqe, &ocqe->cqe, cqe_size);
692                 else
693                         io_account_cq_overflow(ctx);
694
695                 posted = true;
696                 list_del(&ocqe->list);
697                 kfree(ocqe);
698         }
699
700         all_flushed = list_empty(&ctx->cq_overflow_list);
701         if (all_flushed) {
702                 clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
703                 atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
704         }
705
706         io_commit_cqring(ctx);
707         spin_unlock(&ctx->completion_lock);
708         if (posted)
709                 io_cqring_ev_posted(ctx);
710         return all_flushed;
711 }
712
713 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
714 {
715         bool ret = true;
716
717         if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
718                 /* iopoll syncs against uring_lock, not completion_lock */
719                 if (ctx->flags & IORING_SETUP_IOPOLL)
720                         mutex_lock(&ctx->uring_lock);
721                 ret = __io_cqring_overflow_flush(ctx, false);
722                 if (ctx->flags & IORING_SETUP_IOPOLL)
723                         mutex_unlock(&ctx->uring_lock);
724         }
725
726         return ret;
727 }
728
729 static void __io_put_task(struct task_struct *task, int nr)
730 {
731         struct io_uring_task *tctx = task->io_uring;
732
733         percpu_counter_sub(&tctx->inflight, nr);
734         if (unlikely(atomic_read(&tctx->in_idle)))
735                 wake_up(&tctx->wait);
736         put_task_struct_many(task, nr);
737 }
738
739 /* must to be called somewhat shortly after putting a request */
740 static inline void io_put_task(struct task_struct *task, int nr)
741 {
742         if (likely(task == current))
743                 task->io_uring->cached_refs += nr;
744         else
745                 __io_put_task(task, nr);
746 }
747
748 static void io_task_refs_refill(struct io_uring_task *tctx)
749 {
750         unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
751
752         percpu_counter_add(&tctx->inflight, refill);
753         refcount_add(refill, &current->usage);
754         tctx->cached_refs += refill;
755 }
756
757 static inline void io_get_task_refs(int nr)
758 {
759         struct io_uring_task *tctx = current->io_uring;
760
761         tctx->cached_refs -= nr;
762         if (unlikely(tctx->cached_refs < 0))
763                 io_task_refs_refill(tctx);
764 }
765
766 static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
767 {
768         struct io_uring_task *tctx = task->io_uring;
769         unsigned int refs = tctx->cached_refs;
770
771         if (refs) {
772                 tctx->cached_refs = 0;
773                 percpu_counter_sub(&tctx->inflight, refs);
774                 put_task_struct_many(task, refs);
775         }
776 }
777
778 static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
779                                      s32 res, u32 cflags, u64 extra1,
780                                      u64 extra2)
781 {
782         struct io_overflow_cqe *ocqe;
783         size_t ocq_size = sizeof(struct io_overflow_cqe);
784         bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);
785
786         if (is_cqe32)
787                 ocq_size += sizeof(struct io_uring_cqe);
788
789         ocqe = kmalloc(ocq_size, GFP_ATOMIC | __GFP_ACCOUNT);
790         trace_io_uring_cqe_overflow(ctx, user_data, res, cflags, ocqe);
791         if (!ocqe) {
792                 /*
793                  * If we're in ring overflow flush mode, or in task cancel mode,
794                  * or cannot allocate an overflow entry, then we need to drop it
795                  * on the floor.
796                  */
797                 io_account_cq_overflow(ctx);
798                 set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq);
799                 return false;
800         }
801         if (list_empty(&ctx->cq_overflow_list)) {
802                 set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
803                 atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
804
805         }
806         ocqe->cqe.user_data = user_data;
807         ocqe->cqe.res = res;
808         ocqe->cqe.flags = cflags;
809         if (is_cqe32) {
810                 ocqe->cqe.big_cqe[0] = extra1;
811                 ocqe->cqe.big_cqe[1] = extra2;
812         }
813         list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
814         return true;
815 }
816
817 static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
818                                      struct io_kiocb *req)
819 {
820         struct io_uring_cqe *cqe;
821
822         if (!(ctx->flags & IORING_SETUP_CQE32)) {
823                 trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
824                                         req->cqe.res, req->cqe.flags, 0, 0);
825
826                 /*
827                  * If we can't get a cq entry, userspace overflowed the
828                  * submission (by quite a lot). Increment the overflow count in
829                  * the ring.
830                  */
831                 cqe = io_get_cqe(ctx);
832                 if (likely(cqe)) {
833                         memcpy(cqe, &req->cqe, sizeof(*cqe));
834                         return true;
835                 }
836
837                 return io_cqring_event_overflow(ctx, req->cqe.user_data,
838                                                 req->cqe.res, req->cqe.flags,
839                                                 0, 0);
840         } else {
841                 u64 extra1 = 0, extra2 = 0;
842
843                 if (req->flags & REQ_F_CQE32_INIT) {
844                         extra1 = req->extra1;
845                         extra2 = req->extra2;
846                 }
847
848                 trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
849                                         req->cqe.res, req->cqe.flags, extra1, extra2);
850
851                 /*
852                  * If we can't get a cq entry, userspace overflowed the
853                  * submission (by quite a lot). Increment the overflow count in
854                  * the ring.
855                  */
856                 cqe = io_get_cqe(ctx);
857                 if (likely(cqe)) {
858                         memcpy(cqe, &req->cqe, sizeof(struct io_uring_cqe));
859                         WRITE_ONCE(cqe->big_cqe[0], extra1);
860                         WRITE_ONCE(cqe->big_cqe[1], extra2);
861                         return true;
862                 }
863
864                 return io_cqring_event_overflow(ctx, req->cqe.user_data,
865                                 req->cqe.res, req->cqe.flags,
866                                 extra1, extra2);
867         }
868 }
869
870 bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
871                      u32 cflags)
872 {
873         struct io_uring_cqe *cqe;
874
875         ctx->cq_extra++;
876         trace_io_uring_complete(ctx, NULL, user_data, res, cflags, 0, 0);
877
878         /*
879          * If we can't get a cq entry, userspace overflowed the
880          * submission (by quite a lot). Increment the overflow count in
881          * the ring.
882          */
883         cqe = io_get_cqe(ctx);
884         if (likely(cqe)) {
885                 WRITE_ONCE(cqe->user_data, user_data);
886                 WRITE_ONCE(cqe->res, res);
887                 WRITE_ONCE(cqe->flags, cflags);
888
889                 if (ctx->flags & IORING_SETUP_CQE32) {
890                         WRITE_ONCE(cqe->big_cqe[0], 0);
891                         WRITE_ONCE(cqe->big_cqe[1], 0);
892                 }
893                 return true;
894         }
895         return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
896 }
897
898 static void __io_req_complete_put(struct io_kiocb *req)
899 {
900         /*
901          * If we're the last reference to this request, add to our locked
902          * free_list cache.
903          */
904         if (req_ref_put_and_test(req)) {
905                 struct io_ring_ctx *ctx = req->ctx;
906
907                 if (req->flags & IO_REQ_LINK_FLAGS) {
908                         if (req->flags & IO_DISARM_MASK)
909                                 io_disarm_next(req);
910                         if (req->link) {
911                                 io_req_task_queue(req->link);
912                                 req->link = NULL;
913                         }
914                 }
915                 io_req_put_rsrc(req);
916                 /*
917                  * Selected buffer deallocation in io_clean_op() assumes that
918                  * we don't hold ->completion_lock. Clean them here to avoid
919                  * deadlocks.
920                  */
921                 io_put_kbuf_comp(req);
922                 io_dismantle_req(req);
923                 io_put_task(req->task, 1);
924                 wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
925                 ctx->locked_free_nr++;
926         }
927 }
928
929 void __io_req_complete_post(struct io_kiocb *req)
930 {
931         if (!(req->flags & REQ_F_CQE_SKIP))
932                 __io_fill_cqe_req(req->ctx, req);
933         __io_req_complete_put(req);
934 }
935
936 void io_req_complete_post(struct io_kiocb *req)
937 {
938         struct io_ring_ctx *ctx = req->ctx;
939
940         spin_lock(&ctx->completion_lock);
941         __io_req_complete_post(req);
942         io_commit_cqring(ctx);
943         spin_unlock(&ctx->completion_lock);
944         io_cqring_ev_posted(ctx);
945 }
946
947 inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags)
948 {
949         if (issue_flags & IO_URING_F_COMPLETE_DEFER)
950                 req->flags |= REQ_F_COMPLETE_INLINE;
951         else
952                 io_req_complete_post(req);
953 }
954
955 void io_req_complete_failed(struct io_kiocb *req, s32 res)
956 {
957         req_set_fail(req);
958         io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
959         io_req_complete_post(req);
960 }
961
962 /*
963  * Don't initialise the fields below on every allocation, but do that in
964  * advance and keep them valid across allocations.
965  */
966 static void io_preinit_req(struct io_kiocb *req, struct io_ring_ctx *ctx)
967 {
968         req->ctx = ctx;
969         req->link = NULL;
970         req->async_data = NULL;
971         /* not necessary, but safer to zero */
972         req->cqe.res = 0;
973 }
974
975 static void io_flush_cached_locked_reqs(struct io_ring_ctx *ctx,
976                                         struct io_submit_state *state)
977 {
978         spin_lock(&ctx->completion_lock);
979         wq_list_splice(&ctx->locked_free_list, &state->free_list);
980         ctx->locked_free_nr = 0;
981         spin_unlock(&ctx->completion_lock);
982 }
983
984 static inline bool io_req_cache_empty(struct io_ring_ctx *ctx)
985 {
986         return !ctx->submit_state.free_list.next;
987 }
988
989 /*
990  * A request might get retired back into the request caches even before opcode
991  * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
992  * Because of that, io_alloc_req() should be called only under ->uring_lock
993  * and with extra caution to not get a request that is still worked on.
994  */
995 static __cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
996         __must_hold(&ctx->uring_lock)
997 {
998         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
999         void *reqs[IO_REQ_ALLOC_BATCH];
1000         int ret, i;
1001
1002         /*
1003          * If we have more than a batch's worth of requests in our IRQ side
1004          * locked cache, grab the lock and move them over to our submission
1005          * side cache.
1006          */
1007         if (data_race(ctx->locked_free_nr) > IO_COMPL_BATCH) {
1008                 io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
1009                 if (!io_req_cache_empty(ctx))
1010                         return true;
1011         }
1012
1013         ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);
1014
1015         /*
1016          * Bulk alloc is all-or-nothing. If we fail to get a batch,
1017          * retry single alloc to be on the safe side.
1018          */
1019         if (unlikely(ret <= 0)) {
1020                 reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1021                 if (!reqs[0])
1022                         return false;
1023                 ret = 1;
1024         }
1025
1026         percpu_ref_get_many(&ctx->refs, ret);
1027         for (i = 0; i < ret; i++) {
1028                 struct io_kiocb *req = reqs[i];
1029
1030                 io_preinit_req(req, ctx);
1031                 io_req_add_to_cache(req, ctx);
1032         }
1033         return true;
1034 }
1035
1036 static inline bool io_alloc_req_refill(struct io_ring_ctx *ctx)
1037 {
1038         if (unlikely(io_req_cache_empty(ctx)))
1039                 return __io_alloc_req_refill(ctx);
1040         return true;
1041 }
1042
1043 static inline struct io_kiocb *io_alloc_req(struct io_ring_ctx *ctx)
1044 {
1045         struct io_wq_work_node *node;
1046
1047         node = wq_stack_extract(&ctx->submit_state.free_list);
1048         return container_of(node, struct io_kiocb, comp_list);
1049 }
1050
1051 static inline void io_dismantle_req(struct io_kiocb *req)
1052 {
1053         unsigned int flags = req->flags;
1054
1055         if (unlikely(flags & IO_REQ_CLEAN_FLAGS))
1056                 io_clean_op(req);
1057         if (!(flags & REQ_F_FIXED_FILE))
1058                 io_put_file(req->file);
1059 }
1060
1061 __cold void io_free_req(struct io_kiocb *req)
1062 {
1063         struct io_ring_ctx *ctx = req->ctx;
1064
1065         io_req_put_rsrc(req);
1066         io_dismantle_req(req);
1067         io_put_task(req->task, 1);
1068
1069         spin_lock(&ctx->completion_lock);
1070         wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
1071         ctx->locked_free_nr++;
1072         spin_unlock(&ctx->completion_lock);
1073 }
1074
1075 static void __io_req_find_next_prep(struct io_kiocb *req)
1076 {
1077         struct io_ring_ctx *ctx = req->ctx;
1078         bool posted;
1079
1080         spin_lock(&ctx->completion_lock);
1081         posted = io_disarm_next(req);
1082         io_commit_cqring(ctx);
1083         spin_unlock(&ctx->completion_lock);
1084         if (posted)
1085                 io_cqring_ev_posted(ctx);
1086 }
1087
1088 static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
1089 {
1090         struct io_kiocb *nxt;
1091
1092         /*
1093          * If LINK is set, we have dependent requests in this chain. If we
1094          * didn't fail this request, queue the first one up, moving any other
1095          * dependencies to the next request. In case of failure, fail the rest
1096          * of the chain.
1097          */
1098         if (unlikely(req->flags & IO_DISARM_MASK))
1099                 __io_req_find_next_prep(req);
1100         nxt = req->link;
1101         req->link = NULL;
1102         return nxt;
1103 }
1104
1105 static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
1106 {
1107         if (!ctx)
1108                 return;
1109         if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1110                 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1111         if (*locked) {
1112                 io_submit_flush_completions(ctx);
1113                 mutex_unlock(&ctx->uring_lock);
1114                 *locked = false;
1115         }
1116         percpu_ref_put(&ctx->refs);
1117 }
1118
1119 static inline void ctx_commit_and_unlock(struct io_ring_ctx *ctx)
1120 {
1121         io_commit_cqring(ctx);
1122         spin_unlock(&ctx->completion_lock);
1123         io_cqring_ev_posted(ctx);
1124 }
1125
1126 static void handle_prev_tw_list(struct io_wq_work_node *node,
1127                                 struct io_ring_ctx **ctx, bool *uring_locked)
1128 {
1129         if (*ctx && !*uring_locked)
1130                 spin_lock(&(*ctx)->completion_lock);
1131
1132         do {
1133                 struct io_wq_work_node *next = node->next;
1134                 struct io_kiocb *req = container_of(node, struct io_kiocb,
1135                                                     io_task_work.node);
1136
1137                 prefetch(container_of(next, struct io_kiocb, io_task_work.node));
1138
1139                 if (req->ctx != *ctx) {
1140                         if (unlikely(!*uring_locked && *ctx))
1141                                 ctx_commit_and_unlock(*ctx);
1142
1143                         ctx_flush_and_put(*ctx, uring_locked);
1144                         *ctx = req->ctx;
1145                         /* if not contended, grab and improve batching */
1146                         *uring_locked = mutex_trylock(&(*ctx)->uring_lock);
1147                         percpu_ref_get(&(*ctx)->refs);
1148                         if (unlikely(!*uring_locked))
1149                                 spin_lock(&(*ctx)->completion_lock);
1150                 }
1151                 if (likely(*uring_locked)) {
1152                         req->io_task_work.func(req, uring_locked);
1153                 } else {
1154                         req->cqe.flags = io_put_kbuf_comp(req);
1155                         __io_req_complete_post(req);
1156                 }
1157                 node = next;
1158         } while (node);
1159
1160         if (unlikely(!*uring_locked))
1161                 ctx_commit_and_unlock(*ctx);
1162 }
1163
1164 static void handle_tw_list(struct io_wq_work_node *node,
1165                            struct io_ring_ctx **ctx, bool *locked)
1166 {
1167         do {
1168                 struct io_wq_work_node *next = node->next;
1169                 struct io_kiocb *req = container_of(node, struct io_kiocb,
1170                                                     io_task_work.node);
1171
1172                 prefetch(container_of(next, struct io_kiocb, io_task_work.node));
1173
1174                 if (req->ctx != *ctx) {
1175                         ctx_flush_and_put(*ctx, locked);
1176                         *ctx = req->ctx;
1177                         /* if not contended, grab and improve batching */
1178                         *locked = mutex_trylock(&(*ctx)->uring_lock);
1179                         percpu_ref_get(&(*ctx)->refs);
1180                 }
1181                 req->io_task_work.func(req, locked);
1182                 node = next;
1183         } while (node);
1184 }
1185
1186 void tctx_task_work(struct callback_head *cb)
1187 {
1188         bool uring_locked = false;
1189         struct io_ring_ctx *ctx = NULL;
1190         struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
1191                                                   task_work);
1192
1193         while (1) {
1194                 struct io_wq_work_node *node1, *node2;
1195
1196                 spin_lock_irq(&tctx->task_lock);
1197                 node1 = tctx->prio_task_list.first;
1198                 node2 = tctx->task_list.first;
1199                 INIT_WQ_LIST(&tctx->task_list);
1200                 INIT_WQ_LIST(&tctx->prio_task_list);
1201                 if (!node2 && !node1)
1202                         tctx->task_running = false;
1203                 spin_unlock_irq(&tctx->task_lock);
1204                 if (!node2 && !node1)
1205                         break;
1206
1207                 if (node1)
1208                         handle_prev_tw_list(node1, &ctx, &uring_locked);
1209                 if (node2)
1210                         handle_tw_list(node2, &ctx, &uring_locked);
1211                 cond_resched();
1212
1213                 if (data_race(!tctx->task_list.first) &&
1214                     data_race(!tctx->prio_task_list.first) && uring_locked)
1215                         io_submit_flush_completions(ctx);
1216         }
1217
1218         ctx_flush_and_put(ctx, &uring_locked);
1219
1220         /* relaxed read is enough as only the task itself sets ->in_idle */
1221         if (unlikely(atomic_read(&tctx->in_idle)))
1222                 io_uring_drop_tctx_refs(current);
1223 }
1224
1225 static void __io_req_task_work_add(struct io_kiocb *req,
1226                                    struct io_uring_task *tctx,
1227                                    struct io_wq_work_list *list)
1228 {
1229         struct io_ring_ctx *ctx = req->ctx;
1230         struct io_wq_work_node *node;
1231         unsigned long flags;
1232         bool running;
1233
1234         spin_lock_irqsave(&tctx->task_lock, flags);
1235         wq_list_add_tail(&req->io_task_work.node, list);
1236         running = tctx->task_running;
1237         if (!running)
1238                 tctx->task_running = true;
1239         spin_unlock_irqrestore(&tctx->task_lock, flags);
1240
1241         /* task_work already pending, we're done */
1242         if (running)
1243                 return;
1244
1245         if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1246                 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1247
1248         if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
1249                 return;
1250
1251         spin_lock_irqsave(&tctx->task_lock, flags);
1252         tctx->task_running = false;
1253         node = wq_list_merge(&tctx->prio_task_list, &tctx->task_list);
1254         spin_unlock_irqrestore(&tctx->task_lock, flags);
1255
1256         while (node) {
1257                 req = container_of(node, struct io_kiocb, io_task_work.node);
1258                 node = node->next;
1259                 if (llist_add(&req->io_task_work.fallback_node,
1260                               &req->ctx->fallback_llist))
1261                         schedule_delayed_work(&req->ctx->fallback_work, 1);
1262         }
1263 }
1264
1265 void io_req_task_work_add(struct io_kiocb *req)
1266 {
1267         struct io_uring_task *tctx = req->task->io_uring;
1268
1269         __io_req_task_work_add(req, tctx, &tctx->task_list);
1270 }
1271
1272 static void io_req_task_prio_work_add(struct io_kiocb *req)
1273 {
1274         struct io_uring_task *tctx = req->task->io_uring;
1275
1276         if (req->ctx->flags & IORING_SETUP_SQPOLL)
1277                 __io_req_task_work_add(req, tctx, &tctx->prio_task_list);
1278         else
1279                 __io_req_task_work_add(req, tctx, &tctx->task_list);
1280 }
1281
1282 static void io_req_tw_post(struct io_kiocb *req, bool *locked)
1283 {
1284         io_req_complete_post(req);
1285 }
1286
1287 void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags)
1288 {
1289         io_req_set_res(req, res, cflags);
1290         req->io_task_work.func = io_req_tw_post;
1291         io_req_task_work_add(req);
1292 }
1293
1294 static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
1295 {
1296         /* not needed for normal modes, but SQPOLL depends on it */
1297         io_tw_lock(req->ctx, locked);
1298         io_req_complete_failed(req, req->cqe.res);
1299 }
1300
1301 void io_req_task_submit(struct io_kiocb *req, bool *locked)
1302 {
1303         io_tw_lock(req->ctx, locked);
1304         /* req->task == current here, checking PF_EXITING is safe */
1305         if (likely(!(req->task->flags & PF_EXITING)))
1306                 io_queue_sqe(req);
1307         else
1308                 io_req_complete_failed(req, -EFAULT);
1309 }
1310
1311 void io_req_task_queue_fail(struct io_kiocb *req, int ret)
1312 {
1313         io_req_set_res(req, ret, 0);
1314         req->io_task_work.func = io_req_task_cancel;
1315         io_req_task_work_add(req);
1316 }
1317
1318 static void io_req_task_queue(struct io_kiocb *req)
1319 {
1320         req->io_task_work.func = io_req_task_submit;
1321         io_req_task_work_add(req);
1322 }
1323
1324 static void io_req_task_queue_reissue(struct io_kiocb *req)
1325 {
1326         req->io_task_work.func = io_queue_iowq;
1327         io_req_task_work_add(req);
1328 }
1329
1330 void io_queue_next(struct io_kiocb *req)
1331 {
1332         struct io_kiocb *nxt = io_req_find_next(req);
1333
1334         if (nxt)
1335                 io_req_task_queue(nxt);
1336 }
1337
1338 static void io_free_batch_list(struct io_ring_ctx *ctx,
1339                                 struct io_wq_work_node *node)
1340         __must_hold(&ctx->uring_lock)
1341 {
1342         struct task_struct *task = NULL;
1343         int task_refs = 0;
1344
1345         do {
1346                 struct io_kiocb *req = container_of(node, struct io_kiocb,
1347                                                     comp_list);
1348
1349                 if (unlikely(req->flags & IO_REQ_CLEAN_SLOW_FLAGS)) {
1350                         if (req->flags & REQ_F_REFCOUNT) {
1351                                 node = req->comp_list.next;
1352                                 if (!req_ref_put_and_test(req))
1353                                         continue;
1354                         }
1355                         if ((req->flags & REQ_F_POLLED) && req->apoll) {
1356                                 struct async_poll *apoll = req->apoll;
1357
1358                                 if (apoll->double_poll)
1359                                         kfree(apoll->double_poll);
1360                                 list_add(&apoll->poll.wait.entry,
1361                                                 &ctx->apoll_cache);
1362                                 req->flags &= ~REQ_F_POLLED;
1363                         }
1364                         if (req->flags & IO_REQ_LINK_FLAGS)
1365                                 io_queue_next(req);
1366                         if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
1367                                 io_clean_op(req);
1368                 }
1369                 if (!(req->flags & REQ_F_FIXED_FILE))
1370                         io_put_file(req->file);
1371
1372                 io_req_put_rsrc_locked(req, ctx);
1373
1374                 if (req->task != task) {
1375                         if (task)
1376                                 io_put_task(task, task_refs);
1377                         task = req->task;
1378                         task_refs = 0;
1379                 }
1380                 task_refs++;
1381                 node = req->comp_list.next;
1382                 io_req_add_to_cache(req, ctx);
1383         } while (node);
1384
1385         if (task)
1386                 io_put_task(task, task_refs);
1387 }
1388
1389 static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
1390         __must_hold(&ctx->uring_lock)
1391 {
1392         struct io_wq_work_node *node, *prev;
1393         struct io_submit_state *state = &ctx->submit_state;
1394
1395         if (state->flush_cqes) {
1396                 spin_lock(&ctx->completion_lock);
1397                 wq_list_for_each(node, prev, &state->compl_reqs) {
1398                         struct io_kiocb *req = container_of(node, struct io_kiocb,
1399                                                     comp_list);
1400
1401                         if (!(req->flags & REQ_F_CQE_SKIP))
1402                                 __io_fill_cqe_req(ctx, req);
1403                 }
1404
1405                 io_commit_cqring(ctx);
1406                 spin_unlock(&ctx->completion_lock);
1407                 io_cqring_ev_posted(ctx);
1408                 state->flush_cqes = false;
1409         }
1410
1411         io_free_batch_list(ctx, state->compl_reqs.first);
1412         INIT_WQ_LIST(&state->compl_reqs);
1413 }
1414
1415 /*
1416  * Drop reference to request, return next in chain (if there is one) if this
1417  * was the last reference to this request.
1418  */
1419 static inline struct io_kiocb *io_put_req_find_next(struct io_kiocb *req)
1420 {
1421         struct io_kiocb *nxt = NULL;
1422
1423         if (req_ref_put_and_test(req)) {
1424                 if (unlikely(req->flags & IO_REQ_LINK_FLAGS))
1425                         nxt = io_req_find_next(req);
1426                 io_free_req(req);
1427         }
1428         return nxt;
1429 }
1430
1431 static unsigned io_cqring_events(struct io_ring_ctx *ctx)
1432 {
1433         /* See comment at the top of this file */
1434         smp_rmb();
1435         return __io_cqring_events(ctx);
1436 }
1437
1438 int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
1439 {
1440         struct io_wq_work_node *pos, *start, *prev;
1441         unsigned int poll_flags = BLK_POLL_NOSLEEP;
1442         DEFINE_IO_COMP_BATCH(iob);
1443         int nr_events = 0;
1444
1445         /*
1446          * Only spin for completions if we don't have multiple devices hanging
1447          * off our complete list.
1448          */
1449         if (ctx->poll_multi_queue || force_nonspin)
1450                 poll_flags |= BLK_POLL_ONESHOT;
1451
1452         wq_list_for_each(pos, start, &ctx->iopoll_list) {
1453                 struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
1454                 struct io_rw *rw = io_kiocb_to_cmd(req);
1455                 int ret;
1456
1457                 /*
1458                  * Move completed and retryable entries to our local lists.
1459                  * If we find a request that requires polling, break out
1460                  * and complete those lists first, if we have entries there.
1461                  */
1462                 if (READ_ONCE(req->iopoll_completed))
1463                         break;
1464
1465                 ret = rw->kiocb.ki_filp->f_op->iopoll(&rw->kiocb, &iob, poll_flags);
1466                 if (unlikely(ret < 0))
1467                         return ret;
1468                 else if (ret)
1469                         poll_flags |= BLK_POLL_ONESHOT;
1470
1471                 /* iopoll may have completed current req */
1472                 if (!rq_list_empty(iob.req_list) ||
1473                     READ_ONCE(req->iopoll_completed))
1474                         break;
1475         }
1476
1477         if (!rq_list_empty(iob.req_list))
1478                 iob.complete(&iob);
1479         else if (!pos)
1480                 return 0;
1481
1482         prev = start;
1483         wq_list_for_each_resume(pos, prev) {
1484                 struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
1485
1486                 /* order with io_complete_rw_iopoll(), e.g. ->result updates */
1487                 if (!smp_load_acquire(&req->iopoll_completed))
1488                         break;
1489                 nr_events++;
1490                 if (unlikely(req->flags & REQ_F_CQE_SKIP))
1491                         continue;
1492
1493                 req->cqe.flags = io_put_kbuf(req, 0);
1494                 __io_fill_cqe_req(req->ctx, req);
1495         }
1496
1497         if (unlikely(!nr_events))
1498                 return 0;
1499
1500         io_commit_cqring(ctx);
1501         io_cqring_ev_posted_iopoll(ctx);
1502         pos = start ? start->next : ctx->iopoll_list.first;
1503         wq_list_cut(&ctx->iopoll_list, prev, start);
1504         io_free_batch_list(ctx, pos);
1505         return nr_events;
1506 }
1507
1508 /*
1509  * We can't just wait for polled events to come to us, we have to actively
1510  * find and complete them.
1511  */
1512 static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
1513 {
1514         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1515                 return;
1516
1517         mutex_lock(&ctx->uring_lock);
1518         while (!wq_list_empty(&ctx->iopoll_list)) {
1519                 /* let it sleep and repeat later if can't complete a request */
1520                 if (io_do_iopoll(ctx, true) == 0)
1521                         break;
1522                 /*
1523                  * Ensure we allow local-to-the-cpu processing to take place,
1524                  * in this case we need to ensure that we reap all events.
1525                  * Also let task_work, etc. to progress by releasing the mutex
1526                  */
1527                 if (need_resched()) {
1528                         mutex_unlock(&ctx->uring_lock);
1529                         cond_resched();
1530                         mutex_lock(&ctx->uring_lock);
1531                 }
1532         }
1533         mutex_unlock(&ctx->uring_lock);
1534 }
1535
1536 static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
1537 {
1538         unsigned int nr_events = 0;
1539         int ret = 0;
1540         unsigned long check_cq;
1541
1542         /*
1543          * Don't enter poll loop if we already have events pending.
1544          * If we do, we can potentially be spinning for commands that
1545          * already triggered a CQE (eg in error).
1546          */
1547         check_cq = READ_ONCE(ctx->check_cq);
1548         if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
1549                 __io_cqring_overflow_flush(ctx, false);
1550         if (io_cqring_events(ctx))
1551                 return 0;
1552
1553         /*
1554          * Similarly do not spin if we have not informed the user of any
1555          * dropped CQE.
1556          */
1557         if (unlikely(check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)))
1558                 return -EBADR;
1559
1560         do {
1561                 /*
1562                  * If a submit got punted to a workqueue, we can have the
1563                  * application entering polling for a command before it gets
1564                  * issued. That app will hold the uring_lock for the duration
1565                  * of the poll right here, so we need to take a breather every
1566                  * now and then to ensure that the issue has a chance to add
1567                  * the poll to the issued list. Otherwise we can spin here
1568                  * forever, while the workqueue is stuck trying to acquire the
1569                  * very same mutex.
1570                  */
1571                 if (wq_list_empty(&ctx->iopoll_list)) {
1572                         u32 tail = ctx->cached_cq_tail;
1573
1574                         mutex_unlock(&ctx->uring_lock);
1575                         io_run_task_work();
1576                         mutex_lock(&ctx->uring_lock);
1577
1578                         /* some requests don't go through iopoll_list */
1579                         if (tail != ctx->cached_cq_tail ||
1580                             wq_list_empty(&ctx->iopoll_list))
1581                                 break;
1582                 }
1583                 ret = io_do_iopoll(ctx, !min);
1584                 if (ret < 0)
1585                         break;
1586                 nr_events += ret;
1587                 ret = 0;
1588         } while (nr_events < min && !need_resched());
1589
1590         return ret;
1591 }
1592
1593 static void kiocb_end_write(struct io_kiocb *req)
1594 {
1595         /*
1596          * Tell lockdep we inherited freeze protection from submission
1597          * thread.
1598          */
1599         if (req->flags & REQ_F_ISREG) {
1600                 struct super_block *sb = file_inode(req->file)->i_sb;
1601
1602                 __sb_writers_acquired(sb, SB_FREEZE_WRITE);
1603                 sb_end_write(sb);
1604         }
1605 }
1606
1607 #ifdef CONFIG_BLOCK
1608 static bool io_resubmit_prep(struct io_kiocb *req)
1609 {
1610         struct io_async_rw *io = req->async_data;
1611
1612         if (!req_has_async_data(req))
1613                 return !io_req_prep_async(req);
1614         iov_iter_restore(&io->s.iter, &io->s.iter_state);
1615         return true;
1616 }
1617
1618 static bool io_rw_should_reissue(struct io_kiocb *req)
1619 {
1620         umode_t mode = file_inode(req->file)->i_mode;
1621         struct io_ring_ctx *ctx = req->ctx;
1622
1623         if (!S_ISBLK(mode) && !S_ISREG(mode))
1624                 return false;
1625         if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
1626             !(ctx->flags & IORING_SETUP_IOPOLL)))
1627                 return false;
1628         /*
1629          * If ref is dying, we might be running poll reap from the exit work.
1630          * Don't attempt to reissue from that path, just let it fail with
1631          * -EAGAIN.
1632          */
1633         if (percpu_ref_is_dying(&ctx->refs))
1634                 return false;
1635         /*
1636          * Play it safe and assume not safe to re-import and reissue if we're
1637          * not in the original thread group (or in task context).
1638          */
1639         if (!same_thread_group(req->task, current) || !in_task())
1640                 return false;
1641         return true;
1642 }
1643 #else
1644 static bool io_resubmit_prep(struct io_kiocb *req)
1645 {
1646         return false;
1647 }
1648 static bool io_rw_should_reissue(struct io_kiocb *req)
1649 {
1650         return false;
1651 }
1652 #endif
1653
1654 static bool __io_complete_rw_common(struct io_kiocb *req, long res)
1655 {
1656         struct io_rw *rw = io_kiocb_to_cmd(req);
1657
1658         if (rw->kiocb.ki_flags & IOCB_WRITE) {
1659                 kiocb_end_write(req);
1660                 fsnotify_modify(req->file);
1661         } else {
1662                 fsnotify_access(req->file);
1663         }
1664         if (unlikely(res != req->cqe.res)) {
1665                 if ((res == -EAGAIN || res == -EOPNOTSUPP) &&
1666                     io_rw_should_reissue(req)) {
1667                         req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
1668                         return true;
1669                 }
1670                 req_set_fail(req);
1671                 req->cqe.res = res;
1672         }
1673         return false;
1674 }
1675
1676 inline void io_req_task_complete(struct io_kiocb *req, bool *locked)
1677 {
1678         if (*locked) {
1679                 req->cqe.flags |= io_put_kbuf(req, 0);
1680                 req->flags |= REQ_F_COMPLETE_INLINE;
1681                 io_req_add_compl_list(req);
1682         } else {
1683                 req->cqe.flags |= io_put_kbuf(req, IO_URING_F_UNLOCKED);
1684                 io_req_complete_post(req);
1685         }
1686 }
1687
1688 static void __io_complete_rw(struct io_kiocb *req, long res,
1689                              unsigned int issue_flags)
1690 {
1691         if (__io_complete_rw_common(req, res))
1692                 return;
1693         io_req_set_res(req, req->cqe.res, io_put_kbuf(req, issue_flags));
1694         __io_req_complete(req, issue_flags);
1695 }
1696
1697 static void io_complete_rw(struct kiocb *kiocb, long res)
1698 {
1699         struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
1700         struct io_kiocb *req = cmd_to_io_kiocb(rw);
1701
1702         if (__io_complete_rw_common(req, res))
1703                 return;
1704         io_req_set_res(req, res, 0);
1705         req->io_task_work.func = io_req_task_complete;
1706         io_req_task_prio_work_add(req);
1707 }
1708
1709 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
1710 {
1711         struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
1712         struct io_kiocb *req = cmd_to_io_kiocb(rw);
1713
1714         if (kiocb->ki_flags & IOCB_WRITE)
1715                 kiocb_end_write(req);
1716         if (unlikely(res != req->cqe.res)) {
1717                 if (res == -EAGAIN && io_rw_should_reissue(req)) {
1718                         req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
1719                         return;
1720                 }
1721                 req->cqe.res = res;
1722         }
1723
1724         /* order with io_iopoll_complete() checking ->iopoll_completed */
1725         smp_store_release(&req->iopoll_completed, 1);
1726 }
1727
1728 /*
1729  * After the iocb has been issued, it's safe to be found on the poll list.
1730  * Adding the kiocb to the list AFTER submission ensures that we don't
1731  * find it from a io_do_iopoll() thread before the issuer is done
1732  * accessing the kiocb cookie.
1733  */
1734 static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
1735 {
1736         struct io_ring_ctx *ctx = req->ctx;
1737         const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
1738
1739         /* workqueue context doesn't hold uring_lock, grab it now */
1740         if (unlikely(needs_lock))
1741                 mutex_lock(&ctx->uring_lock);
1742
1743         /*
1744          * Track whether we have multiple files in our lists. This will impact
1745          * how we do polling eventually, not spinning if we're on potentially
1746          * different devices.
1747          */
1748         if (wq_list_empty(&ctx->iopoll_list)) {
1749                 ctx->poll_multi_queue = false;
1750         } else if (!ctx->poll_multi_queue) {
1751                 struct io_kiocb *list_req;
1752
1753                 list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
1754                                         comp_list);
1755                 if (list_req->file != req->file)
1756                         ctx->poll_multi_queue = true;
1757         }
1758
1759         /*
1760          * For fast devices, IO may have already completed. If it has, add
1761          * it to the front so we find it first.
1762          */
1763         if (READ_ONCE(req->iopoll_completed))
1764                 wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
1765         else
1766                 wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
1767
1768         if (unlikely(needs_lock)) {
1769                 /*
1770                  * If IORING_SETUP_SQPOLL is enabled, sqes are either handle
1771                  * in sq thread task context or in io worker task context. If
1772                  * current task context is sq thread, we don't need to check
1773                  * whether should wake up sq thread.
1774                  */
1775                 if ((ctx->flags & IORING_SETUP_SQPOLL) &&
1776                     wq_has_sleeper(&ctx->sq_data->wait))
1777                         wake_up(&ctx->sq_data->wait);
1778
1779                 mutex_unlock(&ctx->uring_lock);
1780         }
1781 }
1782
1783 static bool io_bdev_nowait(struct block_device *bdev)
1784 {
1785         return !bdev || blk_queue_nowait(bdev_get_queue(bdev));
1786 }
1787
1788 /*
1789  * If we tracked the file through the SCM inflight mechanism, we could support
1790  * any file. For now, just ensure that anything potentially problematic is done
1791  * inline.
1792  */
1793 static bool __io_file_supports_nowait(struct file *file, umode_t mode)
1794 {
1795         if (S_ISBLK(mode)) {
1796                 if (IS_ENABLED(CONFIG_BLOCK) &&
1797                     io_bdev_nowait(I_BDEV(file->f_mapping->host)))
1798                         return true;
1799                 return false;
1800         }
1801         if (S_ISSOCK(mode))
1802                 return true;
1803         if (S_ISREG(mode)) {
1804                 if (IS_ENABLED(CONFIG_BLOCK) &&
1805                     io_bdev_nowait(file->f_inode->i_sb->s_bdev) &&
1806                     !io_is_uring_fops(file))
1807                         return true;
1808                 return false;
1809         }
1810
1811         /* any ->read/write should understand O_NONBLOCK */
1812         if (file->f_flags & O_NONBLOCK)
1813                 return true;
1814         return file->f_mode & FMODE_NOWAIT;
1815 }
1816
1817 /*
1818  * If we tracked the file through the SCM inflight mechanism, we could support
1819  * any file. For now, just ensure that anything potentially problematic is done
1820  * inline.
1821  */
1822 unsigned int io_file_get_flags(struct file *file)
1823 {
1824         umode_t mode = file_inode(file)->i_mode;
1825         unsigned int res = 0;
1826
1827         if (S_ISREG(mode))
1828                 res |= FFS_ISREG;
1829         if (__io_file_supports_nowait(file, mode))
1830                 res |= FFS_NOWAIT;
1831         if (io_file_need_scm(file))
1832                 res |= FFS_SCM;
1833         return res;
1834 }
1835
1836 static inline bool io_file_supports_nowait(struct io_kiocb *req)
1837 {
1838         return req->flags & REQ_F_SUPPORT_NOWAIT;
1839 }
1840
1841 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1842 {
1843         struct io_rw *rw = io_kiocb_to_cmd(req);
1844         unsigned ioprio;
1845         int ret;
1846
1847         rw->kiocb.ki_pos = READ_ONCE(sqe->off);
1848         /* used for fixed read/write too - just read unconditionally */
1849         req->buf_index = READ_ONCE(sqe->buf_index);
1850
1851         if (req->opcode == IORING_OP_READ_FIXED ||
1852             req->opcode == IORING_OP_WRITE_FIXED) {
1853                 struct io_ring_ctx *ctx = req->ctx;
1854                 u16 index;
1855
1856                 if (unlikely(req->buf_index >= ctx->nr_user_bufs))
1857                         return -EFAULT;
1858                 index = array_index_nospec(req->buf_index, ctx->nr_user_bufs);
1859                 req->imu = ctx->user_bufs[index];
1860                 io_req_set_rsrc_node(req, ctx, 0);
1861         }
1862
1863         ioprio = READ_ONCE(sqe->ioprio);
1864         if (ioprio) {
1865                 ret = ioprio_check_cap(ioprio);
1866                 if (ret)
1867                         return ret;
1868
1869                 rw->kiocb.ki_ioprio = ioprio;
1870         } else {
1871                 rw->kiocb.ki_ioprio = get_current_ioprio();
1872         }
1873
1874         rw->addr = READ_ONCE(sqe->addr);
1875         rw->len = READ_ONCE(sqe->len);
1876         rw->flags = READ_ONCE(sqe->rw_flags);
1877         return 0;
1878 }
1879
1880 static void io_readv_writev_cleanup(struct io_kiocb *req)
1881 {
1882         struct io_async_rw *io = req->async_data;
1883
1884         kfree(io->free_iovec);
1885 }
1886
1887 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1888 {
1889         switch (ret) {
1890         case -EIOCBQUEUED:
1891                 break;
1892         case -ERESTARTSYS:
1893         case -ERESTARTNOINTR:
1894         case -ERESTARTNOHAND:
1895         case -ERESTART_RESTARTBLOCK:
1896                 /*
1897                  * We can't just restart the syscall, since previously
1898                  * submitted sqes may already be in progress. Just fail this
1899                  * IO with EINTR.
1900                  */
1901                 ret = -EINTR;
1902                 fallthrough;
1903         default:
1904                 kiocb->ki_complete(kiocb, ret);
1905         }
1906 }
1907
1908 static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req)
1909 {
1910         struct io_rw *rw = io_kiocb_to_cmd(req);
1911
1912         if (rw->kiocb.ki_pos != -1)
1913                 return &rw->kiocb.ki_pos;
1914
1915         if (!(req->file->f_mode & FMODE_STREAM)) {
1916                 req->flags |= REQ_F_CUR_POS;
1917                 rw->kiocb.ki_pos = req->file->f_pos;
1918                 return &rw->kiocb.ki_pos;
1919         }
1920
1921         rw->kiocb.ki_pos = 0;
1922         return NULL;
1923 }
1924
1925 static void kiocb_done(struct io_kiocb *req, ssize_t ret,
1926                        unsigned int issue_flags)
1927 {
1928         struct io_async_rw *io = req->async_data;
1929         struct io_rw *rw = io_kiocb_to_cmd(req);
1930
1931         /* add previously done IO, if any */
1932         if (req_has_async_data(req) && io->bytes_done > 0) {
1933                 if (ret < 0)
1934                         ret = io->bytes_done;
1935                 else
1936                         ret += io->bytes_done;
1937         }
1938
1939         if (req->flags & REQ_F_CUR_POS)
1940                 req->file->f_pos = rw->kiocb.ki_pos;
1941         if (ret >= 0 && (rw->kiocb.ki_complete == io_complete_rw))
1942                 __io_complete_rw(req, ret, issue_flags);
1943         else
1944                 io_rw_done(&rw->kiocb, ret);
1945
1946         if (req->flags & REQ_F_REISSUE) {
1947                 req->flags &= ~REQ_F_REISSUE;
1948                 if (io_resubmit_prep(req))
1949                         io_req_task_queue_reissue(req);
1950                 else
1951                         io_req_task_queue_fail(req, ret);
1952         }
1953 }
1954
1955 static int __io_import_fixed(struct io_kiocb *req, int ddir,
1956                              struct iov_iter *iter, struct io_mapped_ubuf *imu)
1957 {
1958         struct io_rw *rw = io_kiocb_to_cmd(req);
1959         size_t len = rw->len;
1960         u64 buf_end, buf_addr = rw->addr;
1961         size_t offset;
1962
1963         if (unlikely(check_add_overflow(buf_addr, (u64)len, &buf_end)))
1964                 return -EFAULT;
1965         /* not inside the mapped region */
1966         if (unlikely(buf_addr < imu->ubuf || buf_end > imu->ubuf_end))
1967                 return -EFAULT;
1968
1969         /*
1970          * May not be a start of buffer, set size appropriately
1971          * and advance us to the beginning.
1972          */
1973         offset = buf_addr - imu->ubuf;
1974         iov_iter_bvec(iter, ddir, imu->bvec, imu->nr_bvecs, offset + len);
1975
1976         if (offset) {
1977                 /*
1978                  * Don't use iov_iter_advance() here, as it's really slow for
1979                  * using the latter parts of a big fixed buffer - it iterates
1980                  * over each segment manually. We can cheat a bit here, because
1981                  * we know that:
1982                  *
1983                  * 1) it's a BVEC iter, we set it up
1984                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1985                  *    first and last bvec
1986                  *
1987                  * So just find our index, and adjust the iterator afterwards.
1988                  * If the offset is within the first bvec (or the whole first
1989                  * bvec, just use iov_iter_advance(). This makes it easier
1990                  * since we can just skip the first segment, which may not
1991                  * be PAGE_SIZE aligned.
1992                  */
1993                 const struct bio_vec *bvec = imu->bvec;
1994
1995                 if (offset <= bvec->bv_len) {
1996                         iov_iter_advance(iter, offset);
1997                 } else {
1998                         unsigned long seg_skip;
1999
2000                         /* skip first vec */
2001                         offset -= bvec->bv_len;
2002                         seg_skip = 1 + (offset >> PAGE_SHIFT);
2003
2004                         iter->bvec = bvec + seg_skip;
2005                         iter->nr_segs -= seg_skip;
2006                         iter->count -= bvec->bv_len + offset;
2007                         iter->iov_offset = offset & ~PAGE_MASK;
2008                 }
2009         }
2010
2011         return 0;
2012 }
2013
2014 static int io_import_fixed(struct io_kiocb *req, int rw, struct iov_iter *iter,
2015                            unsigned int issue_flags)
2016 {
2017         if (WARN_ON_ONCE(!req->imu))
2018                 return -EFAULT;
2019         return __io_import_fixed(req, rw, iter, req->imu);
2020 }
2021
2022 #ifdef CONFIG_COMPAT
2023 static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
2024                                 unsigned int issue_flags)
2025 {
2026         struct io_rw *rw = io_kiocb_to_cmd(req);
2027         struct compat_iovec __user *uiov;
2028         compat_ssize_t clen;
2029         void __user *buf;
2030         size_t len;
2031
2032         uiov = u64_to_user_ptr(rw->addr);
2033         if (!access_ok(uiov, sizeof(*uiov)))
2034                 return -EFAULT;
2035         if (__get_user(clen, &uiov->iov_len))
2036                 return -EFAULT;
2037         if (clen < 0)
2038                 return -EINVAL;
2039
2040         len = clen;
2041         buf = io_buffer_select(req, &len, issue_flags);
2042         if (!buf)
2043                 return -ENOBUFS;
2044         rw->addr = (unsigned long) buf;
2045         iov[0].iov_base = buf;
2046         rw->len = iov[0].iov_len = (compat_size_t) len;
2047         return 0;
2048 }
2049 #endif
2050
2051 static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
2052                                       unsigned int issue_flags)
2053 {
2054         struct io_rw *rw = io_kiocb_to_cmd(req);
2055         struct iovec __user *uiov = u64_to_user_ptr(rw->addr);
2056         void __user *buf;
2057         ssize_t len;
2058
2059         if (copy_from_user(iov, uiov, sizeof(*uiov)))
2060                 return -EFAULT;
2061
2062         len = iov[0].iov_len;
2063         if (len < 0)
2064                 return -EINVAL;
2065         buf = io_buffer_select(req, &len, issue_flags);
2066         if (!buf)
2067                 return -ENOBUFS;
2068         rw->addr = (unsigned long) buf;
2069         iov[0].iov_base = buf;
2070         rw->len = iov[0].iov_len = len;
2071         return 0;
2072 }
2073
2074 static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
2075                                     unsigned int issue_flags)
2076 {
2077         struct io_rw *rw = io_kiocb_to_cmd(req);
2078
2079         if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
2080                 iov[0].iov_base = u64_to_user_ptr(rw->addr);
2081                 iov[0].iov_len = rw->len;
2082                 return 0;
2083         }
2084         if (rw->len != 1)
2085                 return -EINVAL;
2086
2087 #ifdef CONFIG_COMPAT
2088         if (req->ctx->compat)
2089                 return io_compat_import(req, iov, issue_flags);
2090 #endif
2091
2092         return __io_iov_buffer_select(req, iov, issue_flags);
2093 }
2094
2095 static struct iovec *__io_import_iovec(int ddir, struct io_kiocb *req,
2096                                        struct io_rw_state *s,
2097                                        unsigned int issue_flags)
2098 {
2099         struct io_rw *rw = io_kiocb_to_cmd(req);
2100         struct iov_iter *iter = &s->iter;
2101         u8 opcode = req->opcode;
2102         struct iovec *iovec;
2103         void __user *buf;
2104         size_t sqe_len;
2105         ssize_t ret;
2106
2107         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
2108                 ret = io_import_fixed(req, ddir, iter, issue_flags);
2109                 if (ret)
2110                         return ERR_PTR(ret);
2111                 return NULL;
2112         }
2113
2114         buf = u64_to_user_ptr(rw->addr);
2115         sqe_len = rw->len;
2116
2117         if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
2118                 if (io_do_buffer_select(req)) {
2119                         buf = io_buffer_select(req, &sqe_len, issue_flags);
2120                         if (!buf)
2121                                 return ERR_PTR(-ENOBUFS);
2122                         rw->addr = (unsigned long) buf;
2123                         rw->len = sqe_len;
2124                 }
2125
2126                 ret = import_single_range(ddir, buf, sqe_len, s->fast_iov, iter);
2127                 if (ret)
2128                         return ERR_PTR(ret);
2129                 return NULL;
2130         }
2131
2132         iovec = s->fast_iov;
2133         if (req->flags & REQ_F_BUFFER_SELECT) {
2134                 ret = io_iov_buffer_select(req, iovec, issue_flags);
2135                 if (ret)
2136                         return ERR_PTR(ret);
2137                 iov_iter_init(iter, ddir, iovec, 1, iovec->iov_len);
2138                 return NULL;
2139         }
2140
2141         ret = __import_iovec(ddir, buf, sqe_len, UIO_FASTIOV, &iovec, iter,
2142                               req->ctx->compat);
2143         if (unlikely(ret < 0))
2144                 return ERR_PTR(ret);
2145         return iovec;
2146 }
2147
2148 static inline int io_import_iovec(int rw, struct io_kiocb *req,
2149                                   struct iovec **iovec, struct io_rw_state *s,
2150                                   unsigned int issue_flags)
2151 {
2152         *iovec = __io_import_iovec(rw, req, s, issue_flags);
2153         if (unlikely(IS_ERR(*iovec)))
2154                 return PTR_ERR(*iovec);
2155
2156         iov_iter_save_state(&s->iter, &s->iter_state);
2157         return 0;
2158 }
2159
2160 static inline loff_t *io_kiocb_ppos(struct kiocb *kiocb)
2161 {
2162         return (kiocb->ki_filp->f_mode & FMODE_STREAM) ? NULL : &kiocb->ki_pos;
2163 }
2164
2165 /*
2166  * For files that don't have ->read_iter() and ->write_iter(), handle them
2167  * by looping over ->read() or ->write() manually.
2168  */
2169 static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter)
2170 {
2171         struct kiocb *kiocb = &rw->kiocb;
2172         struct file *file = kiocb->ki_filp;
2173         ssize_t ret = 0;
2174         loff_t *ppos;
2175
2176         /*
2177          * Don't support polled IO through this interface, and we can't
2178          * support non-blocking either. For the latter, this just causes
2179          * the kiocb to be handled from an async context.
2180          */
2181         if (kiocb->ki_flags & IOCB_HIPRI)
2182                 return -EOPNOTSUPP;
2183         if ((kiocb->ki_flags & IOCB_NOWAIT) &&
2184             !(kiocb->ki_filp->f_flags & O_NONBLOCK))
2185                 return -EAGAIN;
2186
2187         ppos = io_kiocb_ppos(kiocb);
2188
2189         while (iov_iter_count(iter)) {
2190                 struct iovec iovec;
2191                 ssize_t nr;
2192
2193                 if (!iov_iter_is_bvec(iter)) {
2194                         iovec = iov_iter_iovec(iter);
2195                 } else {
2196                         iovec.iov_base = u64_to_user_ptr(rw->addr);
2197                         iovec.iov_len = rw->len;
2198                 }
2199
2200                 if (ddir == READ) {
2201                         nr = file->f_op->read(file, iovec.iov_base,
2202                                               iovec.iov_len, ppos);
2203                 } else {
2204                         nr = file->f_op->write(file, iovec.iov_base,
2205                                                iovec.iov_len, ppos);
2206                 }
2207
2208                 if (nr < 0) {
2209                         if (!ret)
2210                                 ret = nr;
2211                         break;
2212                 }
2213                 ret += nr;
2214                 if (!iov_iter_is_bvec(iter)) {
2215                         iov_iter_advance(iter, nr);
2216                 } else {
2217                         rw->addr += nr;
2218                         rw->len -= nr;
2219                         if (!rw->len)
2220                                 break;
2221                 }
2222                 if (nr != iovec.iov_len)
2223                         break;
2224         }
2225
2226         return ret;
2227 }
2228
2229 static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
2230                           const struct iovec *fast_iov, struct iov_iter *iter)
2231 {
2232         struct io_async_rw *io = req->async_data;
2233
2234         memcpy(&io->s.iter, iter, sizeof(*iter));
2235         io->free_iovec = iovec;
2236         io->bytes_done = 0;
2237         /* can only be fixed buffers, no need to do anything */
2238         if (iov_iter_is_bvec(iter))
2239                 return;
2240         if (!iovec) {
2241                 unsigned iov_off = 0;
2242
2243                 io->s.iter.iov = io->s.fast_iov;
2244                 if (iter->iov != fast_iov) {
2245                         iov_off = iter->iov - fast_iov;
2246                         io->s.iter.iov += iov_off;
2247                 }
2248                 if (io->s.fast_iov != fast_iov)
2249                         memcpy(io->s.fast_iov + iov_off, fast_iov + iov_off,
2250                                sizeof(struct iovec) * iter->nr_segs);
2251         } else {
2252                 req->flags |= REQ_F_NEED_CLEANUP;
2253         }
2254 }
2255
2256 bool io_alloc_async_data(struct io_kiocb *req)
2257 {
2258         WARN_ON_ONCE(!io_op_defs[req->opcode].async_size);
2259         req->async_data = kmalloc(io_op_defs[req->opcode].async_size, GFP_KERNEL);
2260         if (req->async_data) {
2261                 req->flags |= REQ_F_ASYNC_DATA;
2262                 return false;
2263         }
2264         return true;
2265 }
2266
2267 static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
2268                              struct io_rw_state *s, bool force)
2269 {
2270         if (!force && !io_op_defs[req->opcode].prep_async)
2271                 return 0;
2272         if (!req_has_async_data(req)) {
2273                 struct io_async_rw *iorw;
2274
2275                 if (io_alloc_async_data(req)) {
2276                         kfree(iovec);
2277                         return -ENOMEM;
2278                 }
2279
2280                 io_req_map_rw(req, iovec, s->fast_iov, &s->iter);
2281                 iorw = req->async_data;
2282                 /* we've copied and mapped the iter, ensure state is saved */
2283                 iov_iter_save_state(&iorw->s.iter, &iorw->s.iter_state);
2284         }
2285         return 0;
2286 }
2287
2288 static inline int io_rw_prep_async(struct io_kiocb *req, int rw)
2289 {
2290         struct io_async_rw *iorw = req->async_data;
2291         struct iovec *iov;
2292         int ret;
2293
2294         /* submission path, ->uring_lock should already be taken */
2295         ret = io_import_iovec(rw, req, &iov, &iorw->s, 0);
2296         if (unlikely(ret < 0))
2297                 return ret;
2298
2299         iorw->bytes_done = 0;
2300         iorw->free_iovec = iov;
2301         if (iov)
2302                 req->flags |= REQ_F_NEED_CLEANUP;
2303         return 0;
2304 }
2305
2306 static int io_readv_prep_async(struct io_kiocb *req)
2307 {
2308         return io_rw_prep_async(req, READ);
2309 }
2310
2311 static int io_writev_prep_async(struct io_kiocb *req)
2312 {
2313         return io_rw_prep_async(req, WRITE);
2314 }
2315
2316 /*
2317  * This is our waitqueue callback handler, registered through __folio_lock_async()
2318  * when we initially tried to do the IO with the iocb armed our waitqueue.
2319  * This gets called when the page is unlocked, and we generally expect that to
2320  * happen when the page IO is completed and the page is now uptodate. This will
2321  * queue a task_work based retry of the operation, attempting to copy the data
2322  * again. If the latter fails because the page was NOT uptodate, then we will
2323  * do a thread based blocking retry of the operation. That's the unexpected
2324  * slow path.
2325  */
2326 static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
2327                              int sync, void *arg)
2328 {
2329         struct wait_page_queue *wpq;
2330         struct io_kiocb *req = wait->private;
2331         struct io_rw *rw = io_kiocb_to_cmd(req);
2332         struct wait_page_key *key = arg;
2333
2334         wpq = container_of(wait, struct wait_page_queue, wait);
2335
2336         if (!wake_page_match(wpq, key))
2337                 return 0;
2338
2339         rw->kiocb.ki_flags &= ~IOCB_WAITQ;
2340         list_del_init(&wait->entry);
2341         io_req_task_queue(req);
2342         return 1;
2343 }
2344
2345 /*
2346  * This controls whether a given IO request should be armed for async page
2347  * based retry. If we return false here, the request is handed to the async
2348  * worker threads for retry. If we're doing buffered reads on a regular file,
2349  * we prepare a private wait_page_queue entry and retry the operation. This
2350  * will either succeed because the page is now uptodate and unlocked, or it
2351  * will register a callback when the page is unlocked at IO completion. Through
2352  * that callback, io_uring uses task_work to setup a retry of the operation.
2353  * That retry will attempt the buffered read again. The retry will generally
2354  * succeed, or in rare cases where it fails, we then fall back to using the
2355  * async worker threads for a blocking retry.
2356  */
2357 static bool io_rw_should_retry(struct io_kiocb *req)
2358 {
2359         struct io_async_rw *io = req->async_data;
2360         struct wait_page_queue *wait = &io->wpq;
2361         struct io_rw *rw = io_kiocb_to_cmd(req);
2362         struct kiocb *kiocb = &rw->kiocb;
2363
2364         /* never retry for NOWAIT, we just complete with -EAGAIN */
2365         if (req->flags & REQ_F_NOWAIT)
2366                 return false;
2367
2368         /* Only for buffered IO */
2369         if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_HIPRI))
2370                 return false;
2371
2372         /*
2373          * just use poll if we can, and don't attempt if the fs doesn't
2374          * support callback based unlocks
2375          */
2376         if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
2377                 return false;
2378
2379         wait->wait.func = io_async_buf_func;
2380         wait->wait.private = req;
2381         wait->wait.flags = 0;
2382         INIT_LIST_HEAD(&wait->wait.entry);
2383         kiocb->ki_flags |= IOCB_WAITQ;
2384         kiocb->ki_flags &= ~IOCB_NOWAIT;
2385         kiocb->ki_waitq = wait;
2386         return true;
2387 }
2388
2389 static inline int io_iter_do_read(struct io_rw *rw, struct iov_iter *iter)
2390 {
2391         struct file *file = rw->kiocb.ki_filp;
2392
2393         if (likely(file->f_op->read_iter))
2394                 return call_read_iter(file, &rw->kiocb, iter);
2395         else if (file->f_op->read)
2396                 return loop_rw_iter(READ, rw, iter);
2397         else
2398                 return -EINVAL;
2399 }
2400
2401 static bool need_read_all(struct io_kiocb *req)
2402 {
2403         return req->flags & REQ_F_ISREG ||
2404                 S_ISBLK(file_inode(req->file)->i_mode);
2405 }
2406
2407 static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
2408 {
2409         struct io_rw *rw = io_kiocb_to_cmd(req);
2410         struct kiocb *kiocb = &rw->kiocb;
2411         struct io_ring_ctx *ctx = req->ctx;
2412         struct file *file = req->file;
2413         int ret;
2414
2415         if (unlikely(!file || !(file->f_mode & mode)))
2416                 return -EBADF;
2417
2418         if (!io_req_ffs_set(req))
2419                 req->flags |= io_file_get_flags(file) << REQ_F_SUPPORT_NOWAIT_BIT;
2420
2421         kiocb->ki_flags = iocb_flags(file);
2422         ret = kiocb_set_rw_flags(kiocb, rw->flags);
2423         if (unlikely(ret))
2424                 return ret;
2425
2426         /*
2427          * If the file is marked O_NONBLOCK, still allow retry for it if it
2428          * supports async. Otherwise it's impossible to use O_NONBLOCK files
2429          * reliably. If not, or it IOCB_NOWAIT is set, don't retry.
2430          */
2431         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
2432             ((file->f_flags & O_NONBLOCK) && !io_file_supports_nowait(req)))
2433                 req->flags |= REQ_F_NOWAIT;
2434
2435         if (ctx->flags & IORING_SETUP_IOPOLL) {
2436                 if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
2437                         return -EOPNOTSUPP;
2438
2439                 kiocb->private = NULL;
2440                 kiocb->ki_flags |= IOCB_HIPRI | IOCB_ALLOC_CACHE;
2441                 kiocb->ki_complete = io_complete_rw_iopoll;
2442                 req->iopoll_completed = 0;
2443         } else {
2444                 if (kiocb->ki_flags & IOCB_HIPRI)
2445                         return -EINVAL;
2446                 kiocb->ki_complete = io_complete_rw;
2447         }
2448
2449         return 0;
2450 }
2451
2452 static int io_read(struct io_kiocb *req, unsigned int issue_flags)
2453 {
2454         struct io_rw *rw = io_kiocb_to_cmd(req);
2455         struct io_rw_state __s, *s = &__s;
2456         struct iovec *iovec;
2457         struct kiocb *kiocb = &rw->kiocb;
2458         bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
2459         struct io_async_rw *io;
2460         ssize_t ret, ret2;
2461         loff_t *ppos;
2462
2463         if (!req_has_async_data(req)) {
2464                 ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
2465                 if (unlikely(ret < 0))
2466                         return ret;
2467         } else {
2468                 io = req->async_data;
2469                 s = &io->s;
2470
2471                 /*
2472                  * Safe and required to re-import if we're using provided
2473                  * buffers, as we dropped the selected one before retry.
2474                  */
2475                 if (io_do_buffer_select(req)) {
2476                         ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
2477                         if (unlikely(ret < 0))
2478                                 return ret;
2479                 }
2480
2481                 /*
2482                  * We come here from an earlier attempt, restore our state to
2483                  * match in case it doesn't. It's cheap enough that we don't
2484                  * need to make this conditional.
2485                  */
2486                 iov_iter_restore(&s->iter, &s->iter_state);
2487                 iovec = NULL;
2488         }
2489         ret = io_rw_init_file(req, FMODE_READ);
2490         if (unlikely(ret)) {
2491                 kfree(iovec);
2492                 return ret;
2493         }
2494         req->cqe.res = iov_iter_count(&s->iter);
2495
2496         if (force_nonblock) {
2497                 /* If the file doesn't support async, just async punt */
2498                 if (unlikely(!io_file_supports_nowait(req))) {
2499                         ret = io_setup_async_rw(req, iovec, s, true);
2500                         return ret ?: -EAGAIN;
2501                 }
2502                 kiocb->ki_flags |= IOCB_NOWAIT;
2503         } else {
2504                 /* Ensure we clear previously set non-block flag */
2505                 kiocb->ki_flags &= ~IOCB_NOWAIT;
2506         }
2507
2508         ppos = io_kiocb_update_pos(req);
2509
2510         ret = rw_verify_area(READ, req->file, ppos, req->cqe.res);
2511         if (unlikely(ret)) {
2512                 kfree(iovec);
2513                 return ret;
2514         }
2515
2516         ret = io_iter_do_read(rw, &s->iter);
2517
2518         if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) {
2519                 req->flags &= ~REQ_F_REISSUE;
2520                 /* if we can poll, just do that */
2521                 if (req->opcode == IORING_OP_READ && file_can_poll(req->file))
2522                         return -EAGAIN;
2523                 /* IOPOLL retry should happen for io-wq threads */
2524                 if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
2525                         goto done;
2526                 /* no retry on NONBLOCK nor RWF_NOWAIT */
2527                 if (req->flags & REQ_F_NOWAIT)
2528                         goto done;
2529                 ret = 0;
2530         } else if (ret == -EIOCBQUEUED) {
2531                 goto out_free;
2532         } else if (ret == req->cqe.res || ret <= 0 || !force_nonblock ||
2533                    (req->flags & REQ_F_NOWAIT) || !need_read_all(req)) {
2534                 /* read all, failed, already did sync or don't want to retry */
2535                 goto done;
2536         }
2537
2538         /*
2539          * Don't depend on the iter state matching what was consumed, or being
2540          * untouched in case of error. Restore it and we'll advance it
2541          * manually if we need to.
2542          */
2543         iov_iter_restore(&s->iter, &s->iter_state);
2544
2545         ret2 = io_setup_async_rw(req, iovec, s, true);
2546         if (ret2)
2547                 return ret2;
2548
2549         iovec = NULL;
2550         io = req->async_data;
2551         s = &io->s;
2552         /*
2553          * Now use our persistent iterator and state, if we aren't already.
2554          * We've restored and mapped the iter to match.
2555          */
2556
2557         do {
2558                 /*
2559                  * We end up here because of a partial read, either from
2560                  * above or inside this loop. Advance the iter by the bytes
2561                  * that were consumed.
2562                  */
2563                 iov_iter_advance(&s->iter, ret);
2564                 if (!iov_iter_count(&s->iter))
2565                         break;
2566                 io->bytes_done += ret;
2567                 iov_iter_save_state(&s->iter, &s->iter_state);
2568
2569                 /* if we can retry, do so with the callbacks armed */
2570                 if (!io_rw_should_retry(req)) {
2571                         kiocb->ki_flags &= ~IOCB_WAITQ;
2572                         return -EAGAIN;
2573                 }
2574
2575                 /*
2576                  * Now retry read with the IOCB_WAITQ parts set in the iocb. If
2577                  * we get -EIOCBQUEUED, then we'll get a notification when the
2578                  * desired page gets unlocked. We can also get a partial read
2579                  * here, and if we do, then just retry at the new offset.
2580                  */
2581                 ret = io_iter_do_read(rw, &s->iter);
2582                 if (ret == -EIOCBQUEUED)
2583                         return IOU_ISSUE_SKIP_COMPLETE;
2584                 /* we got some bytes, but not all. retry. */
2585                 kiocb->ki_flags &= ~IOCB_WAITQ;
2586                 iov_iter_restore(&s->iter, &s->iter_state);
2587         } while (ret > 0);
2588 done:
2589         kiocb_done(req, ret, issue_flags);
2590 out_free:
2591         /* it's faster to check here then delegate to kfree */
2592         if (iovec)
2593                 kfree(iovec);
2594         return IOU_ISSUE_SKIP_COMPLETE;
2595 }
2596
2597 static int io_write(struct io_kiocb *req, unsigned int issue_flags)
2598 {
2599         struct io_rw *rw = io_kiocb_to_cmd(req);
2600         struct io_rw_state __s, *s = &__s;
2601         struct iovec *iovec;
2602         struct kiocb *kiocb = &rw->kiocb;
2603         bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
2604         ssize_t ret, ret2;
2605         loff_t *ppos;
2606
2607         if (!req_has_async_data(req)) {
2608                 ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags);
2609                 if (unlikely(ret < 0))
2610                         return ret;
2611         } else {
2612                 struct io_async_rw *io = req->async_data;
2613
2614                 s = &io->s;
2615                 iov_iter_restore(&s->iter, &s->iter_state);
2616                 iovec = NULL;
2617         }
2618         ret = io_rw_init_file(req, FMODE_WRITE);
2619         if (unlikely(ret)) {
2620                 kfree(iovec);
2621                 return ret;
2622         }
2623         req->cqe.res = iov_iter_count(&s->iter);
2624
2625         if (force_nonblock) {
2626                 /* If the file doesn't support async, just async punt */
2627                 if (unlikely(!io_file_supports_nowait(req)))
2628                         goto copy_iov;
2629
2630                 /* file path doesn't support NOWAIT for non-direct_IO */
2631                 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
2632                     (req->flags & REQ_F_ISREG))
2633                         goto copy_iov;
2634
2635                 kiocb->ki_flags |= IOCB_NOWAIT;
2636         } else {
2637                 /* Ensure we clear previously set non-block flag */
2638                 kiocb->ki_flags &= ~IOCB_NOWAIT;
2639         }
2640
2641         ppos = io_kiocb_update_pos(req);
2642
2643         ret = rw_verify_area(WRITE, req->file, ppos, req->cqe.res);
2644         if (unlikely(ret))
2645                 goto out_free;
2646
2647         /*
2648          * Open-code file_start_write here to grab freeze protection,
2649          * which will be released by another thread in
2650          * io_complete_rw().  Fool lockdep by telling it the lock got
2651          * released so that it doesn't complain about the held lock when
2652          * we return to userspace.
2653          */
2654         if (req->flags & REQ_F_ISREG) {
2655                 sb_start_write(file_inode(req->file)->i_sb);
2656                 __sb_writers_release(file_inode(req->file)->i_sb,
2657                                         SB_FREEZE_WRITE);
2658         }
2659         kiocb->ki_flags |= IOCB_WRITE;
2660
2661         if (likely(req->file->f_op->write_iter))
2662                 ret2 = call_write_iter(req->file, kiocb, &s->iter);
2663         else if (req->file->f_op->write)
2664                 ret2 = loop_rw_iter(WRITE, rw, &s->iter);
2665         else
2666                 ret2 = -EINVAL;
2667
2668         if (req->flags & REQ_F_REISSUE) {
2669                 req->flags &= ~REQ_F_REISSUE;
2670                 ret2 = -EAGAIN;
2671         }
2672
2673         /*
2674          * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
2675          * retry them without IOCB_NOWAIT.
2676          */
2677         if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
2678                 ret2 = -EAGAIN;
2679         /* no retry on NONBLOCK nor RWF_NOWAIT */
2680         if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
2681                 goto done;
2682         if (!force_nonblock || ret2 != -EAGAIN) {
2683                 /* IOPOLL retry should happen for io-wq threads */
2684                 if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
2685                         goto copy_iov;
2686 done:
2687                 kiocb_done(req, ret2, issue_flags);
2688                 ret = IOU_ISSUE_SKIP_COMPLETE;
2689         } else {
2690 copy_iov:
2691                 iov_iter_restore(&s->iter, &s->iter_state);
2692                 ret = io_setup_async_rw(req, iovec, s, false);
2693                 return ret ?: -EAGAIN;
2694         }
2695 out_free:
2696         /* it's reportedly faster than delegating the null check to kfree() */
2697         if (iovec)
2698                 kfree(iovec);
2699         return ret;
2700 }
2701
2702 static __maybe_unused int io_eopnotsupp_prep(struct io_kiocb *kiocb,
2703                                              const struct io_uring_sqe *sqe)
2704 {
2705         return -EOPNOTSUPP;
2706 }
2707
2708 static int io_req_prep_async(struct io_kiocb *req)
2709 {
2710         const struct io_op_def *def = &io_op_defs[req->opcode];
2711
2712         /* assign early for deferred execution for non-fixed file */
2713         if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE))
2714                 req->file = io_file_get_normal(req, req->cqe.fd);
2715         if (!def->prep_async)
2716                 return 0;
2717         if (WARN_ON_ONCE(req_has_async_data(req)))
2718                 return -EFAULT;
2719         if (io_alloc_async_data(req))
2720                 return -EAGAIN;
2721
2722         return def->prep_async(req);
2723 }
2724
2725 static u32 io_get_sequence(struct io_kiocb *req)
2726 {
2727         u32 seq = req->ctx->cached_sq_head;
2728         struct io_kiocb *cur;
2729
2730         /* need original cached_sq_head, but it was increased for each req */
2731         io_for_each_link(cur, req)
2732                 seq--;
2733         return seq;
2734 }
2735
2736 static __cold void io_drain_req(struct io_kiocb *req)
2737 {
2738         struct io_ring_ctx *ctx = req->ctx;
2739         struct io_defer_entry *de;
2740         int ret;
2741         u32 seq = io_get_sequence(req);
2742
2743         /* Still need defer if there is pending req in defer list. */
2744         spin_lock(&ctx->completion_lock);
2745         if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list)) {
2746                 spin_unlock(&ctx->completion_lock);
2747 queue:
2748                 ctx->drain_active = false;
2749                 io_req_task_queue(req);
2750                 return;
2751         }
2752         spin_unlock(&ctx->completion_lock);
2753
2754         ret = io_req_prep_async(req);
2755         if (ret) {
2756 fail:
2757                 io_req_complete_failed(req, ret);
2758                 return;
2759         }
2760         io_prep_async_link(req);
2761         de = kmalloc(sizeof(*de), GFP_KERNEL);
2762         if (!de) {
2763                 ret = -ENOMEM;
2764                 goto fail;
2765         }
2766
2767         spin_lock(&ctx->completion_lock);
2768         if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
2769                 spin_unlock(&ctx->completion_lock);
2770                 kfree(de);
2771                 goto queue;
2772         }
2773
2774         trace_io_uring_defer(ctx, req, req->cqe.user_data, req->opcode);
2775         de->req = req;
2776         de->seq = seq;
2777         list_add_tail(&de->list, &ctx->defer_list);
2778         spin_unlock(&ctx->completion_lock);
2779 }
2780
2781 static void io_clean_op(struct io_kiocb *req)
2782 {
2783         if (req->flags & REQ_F_BUFFER_SELECTED) {
2784                 spin_lock(&req->ctx->completion_lock);
2785                 io_put_kbuf_comp(req);
2786                 spin_unlock(&req->ctx->completion_lock);
2787         }
2788
2789         if (req->flags & REQ_F_NEED_CLEANUP) {
2790                 const struct io_op_def *def = &io_op_defs[req->opcode];
2791
2792                 if (def->cleanup)
2793                         def->cleanup(req);
2794         }
2795         if ((req->flags & REQ_F_POLLED) && req->apoll) {
2796                 kfree(req->apoll->double_poll);
2797                 kfree(req->apoll);
2798                 req->apoll = NULL;
2799         }
2800         if (req->flags & REQ_F_INFLIGHT) {
2801                 struct io_uring_task *tctx = req->task->io_uring;
2802
2803                 atomic_dec(&tctx->inflight_tracked);
2804         }
2805         if (req->flags & REQ_F_CREDS)
2806                 put_cred(req->creds);
2807         if (req->flags & REQ_F_ASYNC_DATA) {
2808                 kfree(req->async_data);
2809                 req->async_data = NULL;
2810         }
2811         req->flags &= ~IO_REQ_CLEAN_FLAGS;
2812 }
2813
2814 static bool io_assign_file(struct io_kiocb *req, unsigned int issue_flags)
2815 {
2816         if (req->file || !io_op_defs[req->opcode].needs_file)
2817                 return true;
2818
2819         if (req->flags & REQ_F_FIXED_FILE)
2820                 req->file = io_file_get_fixed(req, req->cqe.fd, issue_flags);
2821         else
2822                 req->file = io_file_get_normal(req, req->cqe.fd);
2823
2824         return !!req->file;
2825 }
2826
2827 static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
2828 {
2829         const struct io_op_def *def = &io_op_defs[req->opcode];
2830         const struct cred *creds = NULL;
2831         int ret;
2832
2833         if (unlikely(!io_assign_file(req, issue_flags)))
2834                 return -EBADF;
2835
2836         if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
2837                 creds = override_creds(req->creds);
2838
2839         if (!def->audit_skip)
2840                 audit_uring_entry(req->opcode);
2841
2842         ret = def->issue(req, issue_flags);
2843
2844         if (!def->audit_skip)
2845                 audit_uring_exit(!ret, ret);
2846
2847         if (creds)
2848                 revert_creds(creds);
2849
2850         if (ret == IOU_OK)
2851                 __io_req_complete(req, issue_flags);
2852         else if (ret != IOU_ISSUE_SKIP_COMPLETE)
2853                 return ret;
2854
2855         /* If the op doesn't have a file, we're not polling for it */
2856         if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file)
2857                 io_iopoll_req_issued(req, issue_flags);
2858
2859         return 0;
2860 }
2861
2862 int io_poll_issue(struct io_kiocb *req, bool *locked)
2863 {
2864         io_tw_lock(req->ctx, locked);
2865         if (unlikely(req->task->flags & PF_EXITING))
2866                 return -EFAULT;
2867         return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
2868 }
2869
2870 struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
2871 {
2872         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2873
2874         req = io_put_req_find_next(req);
2875         return req ? &req->work : NULL;
2876 }
2877
2878 void io_wq_submit_work(struct io_wq_work *work)
2879 {
2880         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2881         const struct io_op_def *def = &io_op_defs[req->opcode];
2882         unsigned int issue_flags = IO_URING_F_UNLOCKED;
2883         bool needs_poll = false;
2884         int ret = 0, err = -ECANCELED;
2885
2886         /* one will be dropped by ->io_free_work() after returning to io-wq */
2887         if (!(req->flags & REQ_F_REFCOUNT))
2888                 __io_req_set_refcount(req, 2);
2889         else
2890                 req_ref_get(req);
2891
2892         io_arm_ltimeout(req);
2893
2894         /* either cancelled or io-wq is dying, so don't touch tctx->iowq */
2895         if (work->flags & IO_WQ_WORK_CANCEL) {
2896 fail:
2897                 io_req_task_queue_fail(req, err);
2898                 return;
2899         }
2900         if (!io_assign_file(req, issue_flags)) {
2901                 err = -EBADF;
2902                 work->flags |= IO_WQ_WORK_CANCEL;
2903                 goto fail;
2904         }
2905
2906         if (req->flags & REQ_F_FORCE_ASYNC) {
2907                 bool opcode_poll = def->pollin || def->pollout;
2908
2909                 if (opcode_poll && file_can_poll(req->file)) {
2910                         needs_poll = true;
2911                         issue_flags |= IO_URING_F_NONBLOCK;
2912                 }
2913         }
2914
2915         do {
2916                 ret = io_issue_sqe(req, issue_flags);
2917                 if (ret != -EAGAIN)
2918                         break;
2919                 /*
2920                  * We can get EAGAIN for iopolled IO even though we're
2921                  * forcing a sync submission from here, since we can't
2922                  * wait for request slots on the block side.
2923                  */
2924                 if (!needs_poll) {
2925                         if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
2926                                 break;
2927                         cond_resched();
2928                         continue;
2929                 }
2930
2931                 if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
2932                         return;
2933                 /* aborted or ready, in either case retry blocking */
2934                 needs_poll = false;
2935                 issue_flags &= ~IO_URING_F_NONBLOCK;
2936         } while (1);
2937
2938         /* avoid locking problems by failing it from a clean context */
2939         if (ret < 0)
2940                 io_req_task_queue_fail(req, ret);
2941 }
2942
2943 inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
2944                                       unsigned int issue_flags)
2945 {
2946         struct io_ring_ctx *ctx = req->ctx;
2947         struct file *file = NULL;
2948         unsigned long file_ptr;
2949
2950         io_ring_submit_lock(ctx, issue_flags);
2951
2952         if (unlikely((unsigned int)fd >= ctx->nr_user_files))
2953                 goto out;
2954         fd = array_index_nospec(fd, ctx->nr_user_files);
2955         file_ptr = io_fixed_file_slot(&ctx->file_table, fd)->file_ptr;
2956         file = (struct file *) (file_ptr & FFS_MASK);
2957         file_ptr &= ~FFS_MASK;
2958         /* mask in overlapping REQ_F and FFS bits */
2959         req->flags |= (file_ptr << REQ_F_SUPPORT_NOWAIT_BIT);
2960         io_req_set_rsrc_node(req, ctx, 0);
2961         WARN_ON_ONCE(file && !test_bit(fd, ctx->file_table.bitmap));
2962 out:
2963         io_ring_submit_unlock(ctx, issue_flags);
2964         return file;
2965 }
2966
2967 struct file *io_file_get_normal(struct io_kiocb *req, int fd)
2968 {
2969         struct file *file = fget(fd);
2970
2971         trace_io_uring_file_get(req->ctx, req, req->cqe.user_data, fd);
2972
2973         /* we don't allow fixed io_uring files */
2974         if (file && io_is_uring_fops(file))
2975                 io_req_track_inflight(req);
2976         return file;
2977 }
2978
2979 static void io_queue_async(struct io_kiocb *req, int ret)
2980         __must_hold(&req->ctx->uring_lock)
2981 {
2982         struct io_kiocb *linked_timeout;
2983
2984         if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
2985                 io_req_complete_failed(req, ret);
2986                 return;
2987         }
2988
2989         linked_timeout = io_prep_linked_timeout(req);
2990
2991         switch (io_arm_poll_handler(req, 0)) {
2992         case IO_APOLL_READY:
2993                 io_req_task_queue(req);
2994                 break;
2995         case IO_APOLL_ABORTED:
2996                 /*
2997                  * Queued up for async execution, worker will release
2998                  * submit reference when the iocb is actually submitted.
2999                  */
3000                 io_kbuf_recycle(req, 0);
3001                 io_queue_iowq(req, NULL);
3002                 break;
3003         case IO_APOLL_OK:
3004                 break;
3005         }
3006
3007         if (linked_timeout)
3008                 io_queue_linked_timeout(linked_timeout);
3009 }
3010
3011 static inline void io_queue_sqe(struct io_kiocb *req)
3012         __must_hold(&req->ctx->uring_lock)
3013 {
3014         int ret;
3015
3016         ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
3017
3018         if (req->flags & REQ_F_COMPLETE_INLINE) {
3019                 io_req_add_compl_list(req);
3020                 return;
3021         }
3022         /*
3023          * We async punt it if the file wasn't marked NOWAIT, or if the file
3024          * doesn't support non-blocking read/write attempts
3025          */
3026         if (likely(!ret))
3027                 io_arm_ltimeout(req);
3028         else
3029                 io_queue_async(req, ret);
3030 }
3031
3032 static void io_queue_sqe_fallback(struct io_kiocb *req)
3033         __must_hold(&req->ctx->uring_lock)
3034 {
3035         if (unlikely(req->flags & REQ_F_FAIL)) {
3036                 /*
3037                  * We don't submit, fail them all, for that replace hardlinks
3038                  * with normal links. Extra REQ_F_LINK is tolerated.
3039                  */
3040                 req->flags &= ~REQ_F_HARDLINK;
3041                 req->flags |= REQ_F_LINK;
3042                 io_req_complete_failed(req, req->cqe.res);
3043         } else if (unlikely(req->ctx->drain_active)) {
3044                 io_drain_req(req);
3045         } else {
3046                 int ret = io_req_prep_async(req);
3047
3048                 if (unlikely(ret))
3049                         io_req_complete_failed(req, ret);
3050                 else
3051                         io_queue_iowq(req, NULL);
3052         }
3053 }
3054
3055 /*
3056  * Check SQE restrictions (opcode and flags).
3057  *
3058  * Returns 'true' if SQE is allowed, 'false' otherwise.
3059  */
3060 static inline bool io_check_restriction(struct io_ring_ctx *ctx,
3061                                         struct io_kiocb *req,
3062                                         unsigned int sqe_flags)
3063 {
3064         if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
3065                 return false;
3066
3067         if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
3068             ctx->restrictions.sqe_flags_required)
3069                 return false;
3070
3071         if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
3072                           ctx->restrictions.sqe_flags_required))
3073                 return false;
3074
3075         return true;
3076 }
3077
3078 static void io_init_req_drain(struct io_kiocb *req)
3079 {
3080         struct io_ring_ctx *ctx = req->ctx;
3081         struct io_kiocb *head = ctx->submit_state.link.head;
3082
3083         ctx->drain_active = true;
3084         if (head) {
3085                 /*
3086                  * If we need to drain a request in the middle of a link, drain
3087                  * the head request and the next request/link after the current
3088                  * link. Considering sequential execution of links,
3089                  * REQ_F_IO_DRAIN will be maintained for every request of our
3090                  * link.
3091                  */
3092                 head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
3093                 ctx->drain_next = true;
3094         }
3095 }
3096
3097 static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
3098                        const struct io_uring_sqe *sqe)
3099         __must_hold(&ctx->uring_lock)
3100 {
3101         const struct io_op_def *def;
3102         unsigned int sqe_flags;
3103         int personality;
3104         u8 opcode;
3105
3106         /* req is partially pre-initialised, see io_preinit_req() */
3107         req->opcode = opcode = READ_ONCE(sqe->opcode);
3108         /* same numerical values with corresponding REQ_F_*, safe to copy */
3109         req->flags = sqe_flags = READ_ONCE(sqe->flags);
3110         req->cqe.user_data = READ_ONCE(sqe->user_data);
3111         req->file = NULL;
3112         req->rsrc_node = NULL;
3113         req->task = current;
3114
3115         if (unlikely(opcode >= IORING_OP_LAST)) {
3116                 req->opcode = 0;
3117                 return -EINVAL;
3118         }
3119         def = &io_op_defs[opcode];
3120         if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
3121                 /* enforce forwards compatibility on users */
3122                 if (sqe_flags & ~SQE_VALID_FLAGS)
3123                         return -EINVAL;
3124                 if (sqe_flags & IOSQE_BUFFER_SELECT) {
3125                         if (!def->buffer_select)
3126                                 return -EOPNOTSUPP;
3127                         req->buf_index = READ_ONCE(sqe->buf_group);
3128                 }
3129                 if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
3130                         ctx->drain_disabled = true;
3131                 if (sqe_flags & IOSQE_IO_DRAIN) {
3132                         if (ctx->drain_disabled)
3133                                 return -EOPNOTSUPP;
3134                         io_init_req_drain(req);
3135                 }
3136         }
3137         if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
3138                 if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
3139                         return -EACCES;
3140                 /* knock it to the slow queue path, will be drained there */
3141                 if (ctx->drain_active)
3142                         req->flags |= REQ_F_FORCE_ASYNC;
3143                 /* if there is no link, we're at "next" request and need to drain */
3144                 if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
3145                         ctx->drain_next = false;
3146                         ctx->drain_active = true;
3147                         req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
3148                 }
3149         }
3150
3151         if (!def->ioprio && sqe->ioprio)
3152                 return -EINVAL;
3153         if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL))
3154                 return -EINVAL;
3155
3156         if (def->needs_file) {
3157                 struct io_submit_state *state = &ctx->submit_state;
3158
3159                 req->cqe.fd = READ_ONCE(sqe->fd);
3160
3161                 /*
3162                  * Plug now if we have more than 2 IO left after this, and the
3163                  * target is potentially a read/write to block based storage.
3164                  */
3165                 if (state->need_plug && def->plug) {
3166                         state->plug_started = true;
3167                         state->need_plug = false;
3168                         blk_start_plug_nr_ios(&state->plug, state->submit_nr);
3169                 }
3170         }
3171
3172         personality = READ_ONCE(sqe->personality);
3173         if (personality) {
3174                 int ret;
3175
3176                 req->creds = xa_load(&ctx->personalities, personality);
3177                 if (!req->creds)
3178                         return -EINVAL;
3179                 get_cred(req->creds);
3180                 ret = security_uring_override_creds(req->creds);
3181                 if (ret) {
3182                         put_cred(req->creds);
3183                         return ret;
3184                 }
3185                 req->flags |= REQ_F_CREDS;
3186         }
3187
3188         return def->prep(req, sqe);
3189 }
3190
3191 static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe,
3192                                       struct io_kiocb *req, int ret)
3193 {
3194         struct io_ring_ctx *ctx = req->ctx;
3195         struct io_submit_link *link = &ctx->submit_state.link;
3196         struct io_kiocb *head = link->head;
3197
3198         trace_io_uring_req_failed(sqe, ctx, req, ret);
3199
3200         /*
3201          * Avoid breaking links in the middle as it renders links with SQPOLL
3202          * unusable. Instead of failing eagerly, continue assembling the link if
3203          * applicable and mark the head with REQ_F_FAIL. The link flushing code
3204          * should find the flag and handle the rest.
3205          */
3206         req_fail_link_node(req, ret);
3207         if (head && !(head->flags & REQ_F_FAIL))
3208                 req_fail_link_node(head, -ECANCELED);
3209
3210         if (!(req->flags & IO_REQ_LINK_FLAGS)) {
3211                 if (head) {
3212                         link->last->link = req;
3213                         link->head = NULL;
3214                         req = head;
3215                 }
3216                 io_queue_sqe_fallback(req);
3217                 return ret;
3218         }
3219
3220         if (head)
3221                 link->last->link = req;
3222         else
3223                 link->head = req;
3224         link->last = req;
3225         return 0;
3226 }
3227
3228 static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
3229                          const struct io_uring_sqe *sqe)
3230         __must_hold(&ctx->uring_lock)
3231 {
3232         struct io_submit_link *link = &ctx->submit_state.link;
3233         int ret;
3234
3235         ret = io_init_req(ctx, req, sqe);
3236         if (unlikely(ret))
3237                 return io_submit_fail_init(sqe, req, ret);
3238
3239         /* don't need @sqe from now on */
3240         trace_io_uring_submit_sqe(ctx, req, req->cqe.user_data, req->opcode,
3241                                   req->flags, true,
3242                                   ctx->flags & IORING_SETUP_SQPOLL);
3243
3244         /*
3245          * If we already have a head request, queue this one for async
3246          * submittal once the head completes. If we don't have a head but
3247          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
3248          * submitted sync once the chain is complete. If none of those
3249          * conditions are true (normal request), then just queue it.
3250          */
3251         if (unlikely(link->head)) {
3252                 ret = io_req_prep_async(req);
3253                 if (unlikely(ret))
3254                         return io_submit_fail_init(sqe, req, ret);
3255
3256                 trace_io_uring_link(ctx, req, link->head);
3257                 link->last->link = req;
3258                 link->last = req;
3259
3260                 if (req->flags & IO_REQ_LINK_FLAGS)
3261                         return 0;
3262                 /* last request of the link, flush it */
3263                 req = link->head;
3264                 link->head = NULL;
3265                 if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
3266                         goto fallback;
3267
3268         } else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
3269                                           REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
3270                 if (req->flags & IO_REQ_LINK_FLAGS) {
3271                         link->head = req;
3272                         link->last = req;
3273                 } else {
3274 fallback:
3275                         io_queue_sqe_fallback(req);
3276                 }
3277                 return 0;
3278         }
3279
3280         io_queue_sqe(req);
3281         return 0;
3282 }
3283
3284 /*
3285  * Batched submission is done, ensure local IO is flushed out.
3286  */
3287 static void io_submit_state_end(struct io_ring_ctx *ctx)
3288 {
3289         struct io_submit_state *state = &ctx->submit_state;
3290
3291         if (unlikely(state->link.head))
3292                 io_queue_sqe_fallback(state->link.head);
3293         /* flush only after queuing links as they can generate completions */
3294         io_submit_flush_completions(ctx);
3295         if (state->plug_started)
3296                 blk_finish_plug(&state->plug);
3297 }
3298
3299 /*
3300  * Start submission side cache.
3301  */
3302 static void io_submit_state_start(struct io_submit_state *state,
3303                                   unsigned int max_ios)
3304 {
3305         state->plug_started = false;
3306         state->need_plug = max_ios > 2;
3307         state->submit_nr = max_ios;
3308         /* set only head, no need to init link_last in advance */
3309         state->link.head = NULL;
3310 }
3311
3312 static void io_commit_sqring(struct io_ring_ctx *ctx)
3313 {
3314         struct io_rings *rings = ctx->rings;
3315
3316         /*
3317          * Ensure any loads from the SQEs are done at this point,
3318          * since once we write the new head, the application could
3319          * write new data to them.
3320          */
3321         smp_store_release(&rings->sq.head, ctx->cached_sq_head);
3322 }
3323
3324 /*
3325  * Fetch an sqe, if one is available. Note this returns a pointer to memory
3326  * that is mapped by userspace. This means that care needs to be taken to
3327  * ensure that reads are stable, as we cannot rely on userspace always
3328  * being a good citizen. If members of the sqe are validated and then later
3329  * used, it's important that those reads are done through READ_ONCE() to
3330  * prevent a re-load down the line.
3331  */
3332 static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx)
3333 {
3334         unsigned head, mask = ctx->sq_entries - 1;
3335         unsigned sq_idx = ctx->cached_sq_head++ & mask;
3336
3337         /*
3338          * The cached sq head (or cq tail) serves two purposes:
3339          *
3340          * 1) allows us to batch the cost of updating the user visible
3341          *    head updates.
3342          * 2) allows the kernel side to track the head on its own, even
3343          *    though the application is the one updating it.
3344          */
3345         head = READ_ONCE(ctx->sq_array[sq_idx]);
3346         if (likely(head < ctx->sq_entries)) {
3347                 /* double index for 128-byte SQEs, twice as long */
3348                 if (ctx->flags & IORING_SETUP_SQE128)
3349                         head <<= 1;
3350                 return &ctx->sq_sqes[head];
3351         }
3352
3353         /* drop invalid entries */
3354         ctx->cq_extra--;
3355         WRITE_ONCE(ctx->rings->sq_dropped,
3356                    READ_ONCE(ctx->rings->sq_dropped) + 1);
3357         return NULL;
3358 }
3359
3360 int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
3361         __must_hold(&ctx->uring_lock)
3362 {
3363         unsigned int entries = io_sqring_entries(ctx);
3364         unsigned int left;
3365         int ret;
3366
3367         if (unlikely(!entries))
3368                 return 0;
3369         /* make sure SQ entry isn't read before tail */
3370         ret = left = min3(nr, ctx->sq_entries, entries);
3371         io_get_task_refs(left);
3372         io_submit_state_start(&ctx->submit_state, left);
3373
3374         do {
3375                 const struct io_uring_sqe *sqe;
3376                 struct io_kiocb *req;
3377
3378                 if (unlikely(!io_alloc_req_refill(ctx)))
3379                         break;
3380                 req = io_alloc_req(ctx);
3381                 sqe = io_get_sqe(ctx);
3382                 if (unlikely(!sqe)) {
3383                         io_req_add_to_cache(req, ctx);
3384                         break;
3385                 }
3386
3387                 /*
3388                  * Continue submitting even for sqe failure if the
3389                  * ring was setup with IORING_SETUP_SUBMIT_ALL
3390                  */
3391                 if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
3392                     !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
3393                         left--;
3394                         break;
3395                 }
3396         } while (--left);
3397
3398         if (unlikely(left)) {
3399                 ret -= left;
3400                 /* try again if it submitted nothing and can't allocate a req */
3401                 if (!ret && io_req_cache_empty(ctx))
3402                         ret = -EAGAIN;
3403                 current->io_uring->cached_refs += left;
3404         }
3405
3406         io_submit_state_end(ctx);
3407          /* Commit SQ ring head once we've consumed and submitted all SQEs */
3408         io_commit_sqring(ctx);
3409         return ret;
3410 }
3411
3412 struct io_wait_queue {
3413         struct wait_queue_entry wq;
3414         struct io_ring_ctx *ctx;
3415         unsigned cq_tail;
3416         unsigned nr_timeouts;
3417 };
3418
3419 static inline bool io_should_wake(struct io_wait_queue *iowq)
3420 {
3421         struct io_ring_ctx *ctx = iowq->ctx;
3422         int dist = ctx->cached_cq_tail - (int) iowq->cq_tail;
3423
3424         /*
3425          * Wake up if we have enough events, or if a timeout occurred since we
3426          * started waiting. For timeouts, we always want to return to userspace,
3427          * regardless of event count.
3428          */
3429         return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
3430 }
3431
3432 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
3433                             int wake_flags, void *key)
3434 {
3435         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
3436                                                         wq);
3437
3438         /*
3439          * Cannot safely flush overflowed CQEs from here, ensure we wake up
3440          * the task, and the next invocation will do it.
3441          */
3442         if (io_should_wake(iowq) ||
3443             test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &iowq->ctx->check_cq))
3444                 return autoremove_wake_function(curr, mode, wake_flags, key);
3445         return -1;
3446 }
3447
3448 int io_run_task_work_sig(void)
3449 {
3450         if (io_run_task_work())
3451                 return 1;
3452         if (test_thread_flag(TIF_NOTIFY_SIGNAL))
3453                 return -ERESTARTSYS;
3454         if (task_sigpending(current))
3455                 return -EINTR;
3456         return 0;
3457 }
3458
3459 /* when returns >0, the caller should retry */
3460 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
3461                                           struct io_wait_queue *iowq,
3462                                           ktime_t timeout)
3463 {
3464         int ret;
3465         unsigned long check_cq;
3466
3467         /* make sure we run task_work before checking for signals */
3468         ret = io_run_task_work_sig();
3469         if (ret || io_should_wake(iowq))
3470                 return ret;
3471         check_cq = READ_ONCE(ctx->check_cq);
3472         /* let the caller flush overflows, retry */
3473         if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
3474                 return 1;
3475         if (unlikely(check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)))
3476                 return -EBADR;
3477         if (!schedule_hrtimeout(&timeout, HRTIMER_MODE_ABS))
3478                 return -ETIME;
3479         return 1;
3480 }
3481
3482 /*
3483  * Wait until events become available, if we don't already have some. The
3484  * application must reap them itself, as they reside on the shared cq ring.
3485  */
3486 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
3487                           const sigset_t __user *sig, size_t sigsz,
3488                           struct __kernel_timespec __user *uts)
3489 {
3490         struct io_wait_queue iowq;
3491         struct io_rings *rings = ctx->rings;
3492         ktime_t timeout = KTIME_MAX;
3493         int ret;
3494
3495         do {
3496                 io_cqring_overflow_flush(ctx);
3497                 if (io_cqring_events(ctx) >= min_events)
3498                         return 0;
3499                 if (!io_run_task_work())
3500                         break;
3501         } while (1);
3502
3503         if (sig) {
3504 #ifdef CONFIG_COMPAT
3505                 if (in_compat_syscall())
3506                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
3507                                                       sigsz);
3508                 else
3509 #endif
3510                         ret = set_user_sigmask(sig, sigsz);
3511
3512                 if (ret)
3513                         return ret;
3514         }
3515
3516         if (uts) {
3517                 struct timespec64 ts;
3518
3519                 if (get_timespec64(&ts, uts))
3520                         return -EFAULT;
3521                 timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
3522         }
3523
3524         init_waitqueue_func_entry(&iowq.wq, io_wake_function);
3525         iowq.wq.private = current;
3526         INIT_LIST_HEAD(&iowq.wq.entry);
3527         iowq.ctx = ctx;
3528         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
3529         iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
3530
3531         trace_io_uring_cqring_wait(ctx, min_events);
3532         do {
3533                 /* if we can't even flush overflow, don't wait for more */
3534                 if (!io_cqring_overflow_flush(ctx)) {
3535                         ret = -EBUSY;
3536                         break;
3537                 }
3538                 prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
3539                                                 TASK_INTERRUPTIBLE);
3540                 ret = io_cqring_wait_schedule(ctx, &iowq, timeout);
3541                 cond_resched();
3542         } while (ret > 0);
3543
3544         finish_wait(&ctx->cq_wait, &iowq.wq);
3545         restore_saved_sigmask_unless(ret == -EINTR);
3546
3547         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
3548 }
3549
3550 static void io_mem_free(void *ptr)
3551 {
3552         struct page *page;
3553
3554         if (!ptr)
3555                 return;
3556
3557         page = virt_to_head_page(ptr);
3558         if (put_page_testzero(page))
3559                 free_compound_page(page);
3560 }
3561
3562 static void *io_mem_alloc(size_t size)
3563 {
3564         gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP;
3565
3566         return (void *) __get_free_pages(gfp, get_order(size));
3567 }
3568
3569 static unsigned long rings_size(struct io_ring_ctx *ctx, unsigned int sq_entries,
3570                                 unsigned int cq_entries, size_t *sq_offset)
3571 {
3572         struct io_rings *rings;
3573         size_t off, sq_array_size;
3574
3575         off = struct_size(rings, cqes, cq_entries);
3576         if (off == SIZE_MAX)
3577                 return SIZE_MAX;
3578         if (ctx->flags & IORING_SETUP_CQE32) {
3579                 if (check_shl_overflow(off, 1, &off))
3580                         return SIZE_MAX;
3581         }
3582
3583 #ifdef CONFIG_SMP
3584         off = ALIGN(off, SMP_CACHE_BYTES);
3585         if (off == 0)
3586                 return SIZE_MAX;
3587 #endif
3588
3589         if (sq_offset)
3590                 *sq_offset = off;
3591
3592         sq_array_size = array_size(sizeof(u32), sq_entries);
3593         if (sq_array_size == SIZE_MAX)
3594                 return SIZE_MAX;
3595
3596         if (check_add_overflow(off, sq_array_size, &off))
3597                 return SIZE_MAX;
3598
3599         return off;
3600 }
3601
3602 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
3603                                unsigned int eventfd_async)
3604 {
3605         struct io_ev_fd *ev_fd;
3606         __s32 __user *fds = arg;
3607         int fd;
3608
3609         ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
3610                                         lockdep_is_held(&ctx->uring_lock));
3611         if (ev_fd)
3612                 return -EBUSY;
3613
3614         if (copy_from_user(&fd, fds, sizeof(*fds)))
3615                 return -EFAULT;
3616
3617         ev_fd = kmalloc(sizeof(*ev_fd), GFP_KERNEL);
3618         if (!ev_fd)
3619                 return -ENOMEM;
3620
3621         ev_fd->cq_ev_fd = eventfd_ctx_fdget(fd);
3622         if (IS_ERR(ev_fd->cq_ev_fd)) {
3623                 int ret = PTR_ERR(ev_fd->cq_ev_fd);
3624                 kfree(ev_fd);
3625                 return ret;
3626         }
3627         ev_fd->eventfd_async = eventfd_async;
3628         ctx->has_evfd = true;
3629         rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
3630         return 0;
3631 }
3632
3633 static void io_eventfd_put(struct rcu_head *rcu)
3634 {
3635         struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
3636
3637         eventfd_ctx_put(ev_fd->cq_ev_fd);
3638         kfree(ev_fd);
3639 }
3640
3641 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
3642 {
3643         struct io_ev_fd *ev_fd;
3644
3645         ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
3646                                         lockdep_is_held(&ctx->uring_lock));
3647         if (ev_fd) {
3648                 ctx->has_evfd = false;
3649                 rcu_assign_pointer(ctx->io_ev_fd, NULL);
3650                 call_rcu(&ev_fd->rcu, io_eventfd_put);
3651                 return 0;
3652         }
3653
3654         return -ENXIO;
3655 }
3656
3657 static void io_req_caches_free(struct io_ring_ctx *ctx)
3658 {
3659         struct io_submit_state *state = &ctx->submit_state;
3660         int nr = 0;
3661
3662         mutex_lock(&ctx->uring_lock);
3663         io_flush_cached_locked_reqs(ctx, state);
3664
3665         while (!io_req_cache_empty(ctx)) {
3666                 struct io_wq_work_node *node;
3667                 struct io_kiocb *req;
3668
3669                 node = wq_stack_extract(&state->free_list);
3670                 req = container_of(node, struct io_kiocb, comp_list);
3671                 kmem_cache_free(req_cachep, req);
3672                 nr++;
3673         }
3674         if (nr)
3675                 percpu_ref_put_many(&ctx->refs, nr);
3676         mutex_unlock(&ctx->uring_lock);
3677 }
3678
3679 static void io_flush_apoll_cache(struct io_ring_ctx *ctx)
3680 {
3681         struct async_poll *apoll;
3682
3683         while (!list_empty(&ctx->apoll_cache)) {
3684                 apoll = list_first_entry(&ctx->apoll_cache, struct async_poll,
3685                                                 poll.wait.entry);
3686                 list_del(&apoll->poll.wait.entry);
3687                 kfree(apoll);
3688         }
3689 }
3690
3691 static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
3692 {
3693         io_sq_thread_finish(ctx);
3694
3695         if (ctx->mm_account) {
3696                 mmdrop(ctx->mm_account);
3697                 ctx->mm_account = NULL;
3698         }
3699
3700         io_rsrc_refs_drop(ctx);
3701         /* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
3702         io_wait_rsrc_data(ctx->buf_data);
3703         io_wait_rsrc_data(ctx->file_data);
3704
3705         mutex_lock(&ctx->uring_lock);
3706         if (ctx->buf_data)
3707                 __io_sqe_buffers_unregister(ctx);
3708         if (ctx->file_data)
3709                 __io_sqe_files_unregister(ctx);
3710         if (ctx->rings)
3711                 __io_cqring_overflow_flush(ctx, true);
3712         io_eventfd_unregister(ctx);
3713         io_flush_apoll_cache(ctx);
3714         mutex_unlock(&ctx->uring_lock);
3715         io_destroy_buffers(ctx);
3716         if (ctx->sq_creds)
3717                 put_cred(ctx->sq_creds);
3718
3719         /* there are no registered resources left, nobody uses it */
3720         if (ctx->rsrc_node)
3721                 io_rsrc_node_destroy(ctx->rsrc_node);
3722         if (ctx->rsrc_backup_node)
3723                 io_rsrc_node_destroy(ctx->rsrc_backup_node);
3724         flush_delayed_work(&ctx->rsrc_put_work);
3725         flush_delayed_work(&ctx->fallback_work);
3726
3727         WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
3728         WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist));
3729
3730 #if defined(CONFIG_UNIX)
3731         if (ctx->ring_sock) {
3732                 ctx->ring_sock->file = NULL; /* so that iput() is called */
3733                 sock_release(ctx->ring_sock);
3734         }
3735 #endif
3736         WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
3737
3738         io_mem_free(ctx->rings);
3739         io_mem_free(ctx->sq_sqes);
3740
3741         percpu_ref_exit(&ctx->refs);
3742         free_uid(ctx->user);
3743         io_req_caches_free(ctx);
3744         if (ctx->hash_map)
3745                 io_wq_put_hash(ctx->hash_map);
3746         kfree(ctx->cancel_hash);
3747         kfree(ctx->dummy_ubuf);
3748         kfree(ctx->io_bl);
3749         xa_destroy(&ctx->io_bl_xa);
3750         kfree(ctx);
3751 }
3752
3753 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
3754 {
3755         struct io_ring_ctx *ctx = file->private_data;
3756         __poll_t mask = 0;
3757
3758         poll_wait(file, &ctx->cq_wait, wait);
3759         /*
3760          * synchronizes with barrier from wq_has_sleeper call in
3761          * io_commit_cqring
3762          */
3763         smp_rmb();
3764         if (!io_sqring_full(ctx))
3765                 mask |= EPOLLOUT | EPOLLWRNORM;
3766
3767         /*
3768          * Don't flush cqring overflow list here, just do a simple check.
3769          * Otherwise there could possible be ABBA deadlock:
3770          *      CPU0                    CPU1
3771          *      ----                    ----
3772          * lock(&ctx->uring_lock);
3773          *                              lock(&ep->mtx);
3774          *                              lock(&ctx->uring_lock);
3775          * lock(&ep->mtx);
3776          *
3777          * Users may get EPOLLIN meanwhile seeing nothing in cqring, this
3778          * pushs them to do the flush.
3779          */
3780         if (io_cqring_events(ctx) ||
3781             test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
3782                 mask |= EPOLLIN | EPOLLRDNORM;
3783
3784         return mask;
3785 }
3786
3787 static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id)
3788 {
3789         const struct cred *creds;
3790
3791         creds = xa_erase(&ctx->personalities, id);
3792         if (creds) {
3793                 put_cred(creds);
3794                 return 0;
3795         }
3796
3797         return -EINVAL;
3798 }
3799
3800 struct io_tctx_exit {
3801         struct callback_head            task_work;
3802         struct completion               completion;
3803         struct io_ring_ctx              *ctx;
3804 };
3805
3806 static __cold void io_tctx_exit_cb(struct callback_head *cb)
3807 {
3808         struct io_uring_task *tctx = current->io_uring;
3809         struct io_tctx_exit *work;
3810
3811         work = container_of(cb, struct io_tctx_exit, task_work);
3812         /*
3813          * When @in_idle, we're in cancellation and it's racy to remove the
3814          * node. It'll be removed by the end of cancellation, just ignore it.
3815          */
3816         if (!atomic_read(&tctx->in_idle))
3817                 io_uring_del_tctx_node((unsigned long)work->ctx);
3818         complete(&work->completion);
3819 }
3820
3821 static __cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
3822 {
3823         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3824
3825         return req->ctx == data;
3826 }
3827
3828 static __cold void io_ring_exit_work(struct work_struct *work)
3829 {
3830         struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
3831         unsigned long timeout = jiffies + HZ * 60 * 5;
3832         unsigned long interval = HZ / 20;
3833         struct io_tctx_exit exit;
3834         struct io_tctx_node *node;
3835         int ret;
3836
3837         /*
3838          * If we're doing polled IO and end up having requests being
3839          * submitted async (out-of-line), then completions can come in while
3840          * we're waiting for refs to drop. We need to reap these manually,
3841          * as nobody else will be looking for them.
3842          */
3843         do {
3844                 io_uring_try_cancel_requests(ctx, NULL, true);
3845                 if (ctx->sq_data) {
3846                         struct io_sq_data *sqd = ctx->sq_data;
3847                         struct task_struct *tsk;
3848
3849                         io_sq_thread_park(sqd);
3850                         tsk = sqd->thread;
3851                         if (tsk && tsk->io_uring && tsk->io_uring->io_wq)
3852                                 io_wq_cancel_cb(tsk->io_uring->io_wq,
3853                                                 io_cancel_ctx_cb, ctx, true);
3854                         io_sq_thread_unpark(sqd);
3855                 }
3856
3857                 io_req_caches_free(ctx);
3858
3859                 if (WARN_ON_ONCE(time_after(jiffies, timeout))) {
3860                         /* there is little hope left, don't run it too often */
3861                         interval = HZ * 60;
3862                 }
3863         } while (!wait_for_completion_timeout(&ctx->ref_comp, interval));
3864
3865         init_completion(&exit.completion);
3866         init_task_work(&exit.task_work, io_tctx_exit_cb);
3867         exit.ctx = ctx;
3868         /*
3869          * Some may use context even when all refs and requests have been put,
3870          * and they are free to do so while still holding uring_lock or
3871          * completion_lock, see io_req_task_submit(). Apart from other work,
3872          * this lock/unlock section also waits them to finish.
3873          */
3874         mutex_lock(&ctx->uring_lock);
3875         while (!list_empty(&ctx->tctx_list)) {
3876                 WARN_ON_ONCE(time_after(jiffies, timeout));
3877
3878                 node = list_first_entry(&ctx->tctx_list, struct io_tctx_node,
3879                                         ctx_node);
3880                 /* don't spin on a single task if cancellation failed */
3881                 list_rotate_left(&ctx->tctx_list);
3882                 ret = task_work_add(node->task, &exit.task_work, TWA_SIGNAL);
3883                 if (WARN_ON_ONCE(ret))
3884                         continue;
3885
3886                 mutex_unlock(&ctx->uring_lock);
3887                 wait_for_completion(&exit.completion);
3888                 mutex_lock(&ctx->uring_lock);
3889         }
3890         mutex_unlock(&ctx->uring_lock);
3891         spin_lock(&ctx->completion_lock);
3892         spin_unlock(&ctx->completion_lock);
3893
3894         io_ring_ctx_free(ctx);
3895 }
3896
3897 static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
3898 {
3899         unsigned long index;
3900         struct creds *creds;
3901
3902         mutex_lock(&ctx->uring_lock);
3903         percpu_ref_kill(&ctx->refs);
3904         if (ctx->rings)
3905                 __io_cqring_overflow_flush(ctx, true);
3906         xa_for_each(&ctx->personalities, index, creds)
3907                 io_unregister_personality(ctx, index);
3908         mutex_unlock(&ctx->uring_lock);
3909
3910         /* failed during ring init, it couldn't have issued any requests */
3911         if (ctx->rings) {
3912                 io_kill_timeouts(ctx, NULL, true);
3913                 io_poll_remove_all(ctx, NULL, true);
3914                 /* if we failed setting up the ctx, we might not have any rings */
3915                 io_iopoll_try_reap_events(ctx);
3916         }
3917
3918         INIT_WORK(&ctx->exit_work, io_ring_exit_work);
3919         /*
3920          * Use system_unbound_wq to avoid spawning tons of event kworkers
3921          * if we're exiting a ton of rings at the same time. It just adds
3922          * noise and overhead, there's no discernable change in runtime
3923          * over using system_wq.
3924          */
3925         queue_work(system_unbound_wq, &ctx->exit_work);
3926 }
3927
3928 static int io_uring_release(struct inode *inode, struct file *file)
3929 {
3930         struct io_ring_ctx *ctx = file->private_data;
3931
3932         file->private_data = NULL;
3933         io_ring_ctx_wait_and_kill(ctx);
3934         return 0;
3935 }
3936
3937 struct io_task_cancel {
3938         struct task_struct *task;
3939         bool all;
3940 };
3941
3942 static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
3943 {
3944         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3945         struct io_task_cancel *cancel = data;
3946
3947         return io_match_task_safe(req, cancel->task, cancel->all);
3948 }
3949
3950 static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
3951                                          struct task_struct *task,
3952                                          bool cancel_all)
3953 {
3954         struct io_defer_entry *de;
3955         LIST_HEAD(list);
3956
3957         spin_lock(&ctx->completion_lock);
3958         list_for_each_entry_reverse(de, &ctx->defer_list, list) {
3959                 if (io_match_task_safe(de->req, task, cancel_all)) {
3960                         list_cut_position(&list, &ctx->defer_list, &de->list);
3961                         break;
3962                 }
3963         }
3964         spin_unlock(&ctx->completion_lock);
3965         if (list_empty(&list))
3966                 return false;
3967
3968         while (!list_empty(&list)) {
3969                 de = list_first_entry(&list, struct io_defer_entry, list);
3970                 list_del_init(&de->list);
3971                 io_req_complete_failed(de->req, -ECANCELED);
3972                 kfree(de);
3973         }
3974         return true;
3975 }
3976
3977 static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
3978 {
3979         struct io_tctx_node *node;
3980         enum io_wq_cancel cret;
3981         bool ret = false;
3982
3983         mutex_lock(&ctx->uring_lock);
3984         list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
3985                 struct io_uring_task *tctx = node->task->io_uring;
3986
3987                 /*
3988                  * io_wq will stay alive while we hold uring_lock, because it's
3989                  * killed after ctx nodes, which requires to take the lock.
3990                  */
3991                 if (!tctx || !tctx->io_wq)
3992                         continue;
3993                 cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
3994                 ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
3995         }
3996         mutex_unlock(&ctx->uring_lock);
3997
3998         return ret;
3999 }
4000
4001 static __cold void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
4002                                                 struct task_struct *task,
4003                                                 bool cancel_all)
4004 {
4005         struct io_task_cancel cancel = { .task = task, .all = cancel_all, };
4006         struct io_uring_task *tctx = task ? task->io_uring : NULL;
4007
4008         /* failed during ring init, it couldn't have issued any requests */
4009         if (!ctx->rings)
4010                 return;
4011
4012         while (1) {
4013                 enum io_wq_cancel cret;
4014                 bool ret = false;
4015
4016                 if (!task) {
4017                         ret |= io_uring_try_cancel_iowq(ctx);
4018                 } else if (tctx && tctx->io_wq) {
4019                         /*
4020                          * Cancels requests of all rings, not only @ctx, but
4021                          * it's fine as the task is in exit/exec.
4022                          */
4023                         cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
4024                                                &cancel, true);
4025                         ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
4026                 }
4027
4028                 /* SQPOLL thread does its own polling */
4029                 if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
4030                     (ctx->sq_data && ctx->sq_data->thread == current)) {
4031                         while (!wq_list_empty(&ctx->iopoll_list)) {
4032                                 io_iopoll_try_reap_events(ctx);
4033                                 ret = true;
4034                         }
4035                 }
4036
4037                 ret |= io_cancel_defer_files(ctx, task, cancel_all);
4038                 ret |= io_poll_remove_all(ctx, task, cancel_all);
4039                 ret |= io_kill_timeouts(ctx, task, cancel_all);
4040                 if (task)
4041                         ret |= io_run_task_work();
4042                 if (!ret)
4043                         break;
4044                 cond_resched();
4045         }
4046 }
4047
4048 static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked)
4049 {
4050         if (tracked)
4051                 return atomic_read(&tctx->inflight_tracked);
4052         return percpu_counter_sum(&tctx->inflight);
4053 }
4054
4055 /*
4056  * Find any io_uring ctx that this task has registered or done IO on, and cancel
4057  * requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation.
4058  */
4059 __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
4060 {
4061         struct io_uring_task *tctx = current->io_uring;
4062         struct io_ring_ctx *ctx;
4063         s64 inflight;
4064         DEFINE_WAIT(wait);
4065
4066         WARN_ON_ONCE(sqd && sqd->thread != current);
4067
4068         if (!current->io_uring)
4069                 return;
4070         if (tctx->io_wq)
4071                 io_wq_exit_start(tctx->io_wq);
4072
4073         atomic_inc(&tctx->in_idle);
4074         do {
4075                 io_uring_drop_tctx_refs(current);
4076                 /* read completions before cancelations */
4077                 inflight = tctx_inflight(tctx, !cancel_all);
4078                 if (!inflight)
4079                         break;
4080
4081                 if (!sqd) {
4082                         struct io_tctx_node *node;
4083                         unsigned long index;
4084
4085                         xa_for_each(&tctx->xa, index, node) {
4086                                 /* sqpoll task will cancel all its requests */
4087                                 if (node->ctx->sq_data)
4088                                         continue;
4089                                 io_uring_try_cancel_requests(node->ctx, current,
4090                                                              cancel_all);
4091                         }
4092                 } else {
4093                         list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
4094                                 io_uring_try_cancel_requests(ctx, current,
4095                                                              cancel_all);
4096                 }
4097
4098                 prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
4099                 io_run_task_work();
4100                 io_uring_drop_tctx_refs(current);
4101
4102                 /*
4103                  * If we've seen completions, retry without waiting. This
4104                  * avoids a race where a completion comes in before we did
4105                  * prepare_to_wait().
4106                  */
4107                 if (inflight == tctx_inflight(tctx, !cancel_all))
4108                         schedule();
4109                 finish_wait(&tctx->wait, &wait);
4110         } while (1);
4111
4112         io_uring_clean_tctx(tctx);
4113         if (cancel_all) {
4114                 /*
4115                  * We shouldn't run task_works after cancel, so just leave
4116                  * ->in_idle set for normal exit.
4117                  */
4118                 atomic_dec(&tctx->in_idle);
4119                 /* for exec all current's requests should be gone, kill tctx */
4120                 __io_uring_free(current);
4121         }
4122 }
4123
4124 void __io_uring_cancel(bool cancel_all)
4125 {
4126         io_uring_cancel_generic(cancel_all, NULL);
4127 }
4128
4129 static void *io_uring_validate_mmap_request(struct file *file,
4130                                             loff_t pgoff, size_t sz)
4131 {
4132         struct io_ring_ctx *ctx = file->private_data;
4133         loff_t offset = pgoff << PAGE_SHIFT;
4134         struct page *page;
4135         void *ptr;
4136
4137         switch (offset) {
4138         case IORING_OFF_SQ_RING:
4139         case IORING_OFF_CQ_RING:
4140                 ptr = ctx->rings;
4141                 break;
4142         case IORING_OFF_SQES:
4143                 ptr = ctx->sq_sqes;
4144                 break;
4145         default:
4146                 return ERR_PTR(-EINVAL);
4147         }
4148
4149         page = virt_to_head_page(ptr);
4150         if (sz > page_size(page))
4151                 return ERR_PTR(-EINVAL);
4152
4153         return ptr;
4154 }
4155
4156 #ifdef CONFIG_MMU
4157
4158 static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
4159 {
4160         size_t sz = vma->vm_end - vma->vm_start;
4161         unsigned long pfn;
4162         void *ptr;
4163
4164         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
4165         if (IS_ERR(ptr))
4166                 return PTR_ERR(ptr);
4167
4168         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
4169         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
4170 }
4171
4172 #else /* !CONFIG_MMU */
4173
4174 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
4175 {
4176         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
4177 }
4178
4179 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
4180 {
4181         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
4182 }
4183
4184 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
4185         unsigned long addr, unsigned long len,
4186         unsigned long pgoff, unsigned long flags)
4187 {
4188         void *ptr;
4189
4190         ptr = io_uring_validate_mmap_request(file, pgoff, len);
4191         if (IS_ERR(ptr))
4192                 return PTR_ERR(ptr);
4193
4194         return (unsigned long) ptr;
4195 }
4196
4197 #endif /* !CONFIG_MMU */
4198
4199 static int io_validate_ext_arg(unsigned flags, const void __user *argp, size_t argsz)
4200 {
4201         if (flags & IORING_ENTER_EXT_ARG) {
4202                 struct io_uring_getevents_arg arg;
4203
4204                 if (argsz != sizeof(arg))
4205                         return -EINVAL;
4206                 if (copy_from_user(&arg, argp, sizeof(arg)))
4207                         return -EFAULT;
4208         }
4209         return 0;
4210 }
4211
4212 static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz,
4213                           struct __kernel_timespec __user **ts,
4214                           const sigset_t __user **sig)
4215 {
4216         struct io_uring_getevents_arg arg;
4217
4218         /*
4219          * If EXT_ARG isn't set, then we have no timespec and the argp pointer
4220          * is just a pointer to the sigset_t.
4221          */
4222         if (!(flags & IORING_ENTER_EXT_ARG)) {
4223                 *sig = (const sigset_t __user *) argp;
4224                 *ts = NULL;
4225                 return 0;
4226         }
4227
4228         /*
4229          * EXT_ARG is set - ensure we agree on the size of it and copy in our
4230          * timespec and sigset_t pointers if good.
4231          */
4232         if (*argsz != sizeof(arg))
4233                 return -EINVAL;
4234         if (copy_from_user(&arg, argp, sizeof(arg)))
4235                 return -EFAULT;
4236         if (arg.pad)
4237                 return -EINVAL;
4238         *sig = u64_to_user_ptr(arg.sigmask);
4239         *argsz = arg.sigmask_sz;
4240         *ts = u64_to_user_ptr(arg.ts);
4241         return 0;
4242 }
4243
4244 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
4245                 u32, min_complete, u32, flags, const void __user *, argp,
4246                 size_t, argsz)
4247 {
4248         struct io_ring_ctx *ctx;
4249         struct fd f;
4250         long ret;
4251
4252         io_run_task_work();
4253
4254         if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
4255                                IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
4256                                IORING_ENTER_REGISTERED_RING)))
4257                 return -EINVAL;
4258
4259         /*
4260          * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
4261          * need only dereference our task private array to find it.
4262          */
4263         if (flags & IORING_ENTER_REGISTERED_RING) {
4264                 struct io_uring_task *tctx = current->io_uring;
4265
4266                 if (!tctx || fd >= IO_RINGFD_REG_MAX)
4267                         return -EINVAL;
4268                 fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
4269                 f.file = tctx->registered_rings[fd];
4270                 f.flags = 0;
4271         } else {
4272                 f = fdget(fd);
4273         }
4274
4275         if (unlikely(!f.file))
4276                 return -EBADF;
4277
4278         ret = -EOPNOTSUPP;
4279         if (unlikely(!io_is_uring_fops(f.file)))
4280                 goto out_fput;
4281
4282         ret = -ENXIO;
4283         ctx = f.file->private_data;
4284         if (unlikely(!percpu_ref_tryget(&ctx->refs)))
4285                 goto out_fput;
4286
4287         ret = -EBADFD;
4288         if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
4289                 goto out;
4290
4291         /*
4292          * For SQ polling, the thread will do all submissions and completions.
4293          * Just return the requested submit count, and wake the thread if
4294          * we were asked to.
4295          */
4296         ret = 0;
4297         if (ctx->flags & IORING_SETUP_SQPOLL) {
4298                 io_cqring_overflow_flush(ctx);
4299
4300                 if (unlikely(ctx->sq_data->thread == NULL)) {
4301                         ret = -EOWNERDEAD;
4302                         goto out;
4303                 }
4304                 if (flags & IORING_ENTER_SQ_WAKEUP)
4305                         wake_up(&ctx->sq_data->wait);
4306                 if (flags & IORING_ENTER_SQ_WAIT) {
4307                         ret = io_sqpoll_wait_sq(ctx);
4308                         if (ret)
4309                                 goto out;
4310                 }
4311                 ret = to_submit;
4312         } else if (to_submit) {
4313                 ret = io_uring_add_tctx_node(ctx);
4314                 if (unlikely(ret))
4315                         goto out;
4316
4317                 mutex_lock(&ctx->uring_lock);
4318                 ret = io_submit_sqes(ctx, to_submit);
4319                 if (ret != to_submit) {
4320                         mutex_unlock(&ctx->uring_lock);
4321                         goto out;
4322                 }
4323                 if ((flags & IORING_ENTER_GETEVENTS) && ctx->syscall_iopoll)
4324                         goto iopoll_locked;
4325                 mutex_unlock(&ctx->uring_lock);
4326         }
4327         if (flags & IORING_ENTER_GETEVENTS) {
4328                 int ret2;
4329                 if (ctx->syscall_iopoll) {
4330                         /*
4331                          * We disallow the app entering submit/complete with
4332                          * polling, but we still need to lock the ring to
4333                          * prevent racing with polled issue that got punted to
4334                          * a workqueue.
4335                          */
4336                         mutex_lock(&ctx->uring_lock);
4337 iopoll_locked:
4338                         ret2 = io_validate_ext_arg(flags, argp, argsz);
4339                         if (likely(!ret2)) {
4340                                 min_complete = min(min_complete,
4341                                                    ctx->cq_entries);
4342                                 ret2 = io_iopoll_check(ctx, min_complete);
4343                         }
4344                         mutex_unlock(&ctx->uring_lock);
4345                 } else {
4346                         const sigset_t __user *sig;
4347                         struct __kernel_timespec __user *ts;
4348
4349                         ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
4350                         if (likely(!ret2)) {
4351                                 min_complete = min(min_complete,
4352                                                    ctx->cq_entries);
4353                                 ret2 = io_cqring_wait(ctx, min_complete, sig,
4354                                                       argsz, ts);
4355                         }
4356                 }
4357
4358                 if (!ret) {
4359                         ret = ret2;
4360
4361                         /*
4362                          * EBADR indicates that one or more CQE were dropped.
4363                          * Once the user has been informed we can clear the bit
4364                          * as they are obviously ok with those drops.
4365                          */
4366                         if (unlikely(ret2 == -EBADR))
4367                                 clear_bit(IO_CHECK_CQ_DROPPED_BIT,
4368                                           &ctx->check_cq);
4369                 }
4370         }
4371
4372 out:
4373         percpu_ref_put(&ctx->refs);
4374 out_fput:
4375         fdput(f);
4376         return ret;
4377 }
4378
4379 static const struct file_operations io_uring_fops = {
4380         .release        = io_uring_release,
4381         .mmap           = io_uring_mmap,
4382 #ifndef CONFIG_MMU
4383         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
4384         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
4385 #endif
4386         .poll           = io_uring_poll,
4387 #ifdef CONFIG_PROC_FS
4388         .show_fdinfo    = io_uring_show_fdinfo,
4389 #endif
4390 };
4391
4392 bool io_is_uring_fops(struct file *file)
4393 {
4394         return file->f_op == &io_uring_fops;
4395 }
4396
4397 static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
4398                                          struct io_uring_params *p)
4399 {
4400         struct io_rings *rings;
4401         size_t size, sq_array_offset;
4402
4403         /* make sure these are sane, as we already accounted them */
4404         ctx->sq_entries = p->sq_entries;
4405         ctx->cq_entries = p->cq_entries;
4406
4407         size = rings_size(ctx, p->sq_entries, p->cq_entries, &sq_array_offset);
4408         if (size == SIZE_MAX)
4409                 return -EOVERFLOW;
4410
4411         rings = io_mem_alloc(size);
4412         if (!rings)
4413                 return -ENOMEM;
4414
4415         ctx->rings = rings;
4416         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
4417         rings->sq_ring_mask = p->sq_entries - 1;
4418         rings->cq_ring_mask = p->cq_entries - 1;
4419         rings->sq_ring_entries = p->sq_entries;
4420         rings->cq_ring_entries = p->cq_entries;
4421
4422         if (p->flags & IORING_SETUP_SQE128)
4423                 size = array_size(2 * sizeof(struct io_uring_sqe), p->sq_entries);
4424         else
4425                 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
4426         if (size == SIZE_MAX) {
4427                 io_mem_free(ctx->rings);
4428                 ctx->rings = NULL;
4429                 return -EOVERFLOW;
4430         }
4431
4432         ctx->sq_sqes = io_mem_alloc(size);
4433         if (!ctx->sq_sqes) {
4434                 io_mem_free(ctx->rings);
4435                 ctx->rings = NULL;
4436                 return -ENOMEM;
4437         }
4438
4439         return 0;
4440 }
4441
4442 static int io_uring_install_fd(struct io_ring_ctx *ctx, struct file *file)
4443 {
4444         int ret, fd;
4445
4446         fd = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
4447         if (fd < 0)
4448                 return fd;
4449
4450         ret = io_uring_add_tctx_node(ctx);
4451         if (ret) {
4452                 put_unused_fd(fd);
4453                 return ret;
4454         }
4455         fd_install(fd, file);
4456         return fd;
4457 }
4458
4459 /*
4460  * Allocate an anonymous fd, this is what constitutes the application
4461  * visible backing of an io_uring instance. The application mmaps this
4462  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
4463  * we have to tie this fd to a socket for file garbage collection purposes.
4464  */
4465 static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
4466 {
4467         struct file *file;
4468 #if defined(CONFIG_UNIX)
4469         int ret;
4470
4471         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
4472                                 &ctx->ring_sock);
4473         if (ret)
4474                 return ERR_PTR(ret);
4475 #endif
4476
4477         file = anon_inode_getfile_secure("[io_uring]", &io_uring_fops, ctx,
4478                                          O_RDWR | O_CLOEXEC, NULL);
4479 #if defined(CONFIG_UNIX)
4480         if (IS_ERR(file)) {
4481                 sock_release(ctx->ring_sock);
4482                 ctx->ring_sock = NULL;
4483         } else {
4484                 ctx->ring_sock->file = file;
4485         }
4486 #endif
4487         return file;
4488 }
4489
4490 static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
4491                                   struct io_uring_params __user *params)
4492 {
4493         struct io_ring_ctx *ctx;
4494         struct file *file;
4495         int ret;
4496
4497         if (!entries)
4498                 return -EINVAL;
4499         if (entries > IORING_MAX_ENTRIES) {
4500                 if (!(p->flags & IORING_SETUP_CLAMP))
4501                         return -EINVAL;
4502                 entries = IORING_MAX_ENTRIES;
4503         }
4504
4505         /*
4506          * Use twice as many entries for the CQ ring. It's possible for the
4507          * application to drive a higher depth than the size of the SQ ring,
4508          * since the sqes are only used at submission time. This allows for
4509          * some flexibility in overcommitting a bit. If the application has
4510          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
4511          * of CQ ring entries manually.
4512          */
4513         p->sq_entries = roundup_pow_of_two(entries);
4514         if (p->flags & IORING_SETUP_CQSIZE) {
4515                 /*
4516                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
4517                  * to a power-of-two, if it isn't already. We do NOT impose
4518                  * any cq vs sq ring sizing.
4519                  */
4520                 if (!p->cq_entries)
4521                         return -EINVAL;
4522                 if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
4523                         if (!(p->flags & IORING_SETUP_CLAMP))
4524                                 return -EINVAL;
4525                         p->cq_entries = IORING_MAX_CQ_ENTRIES;
4526                 }
4527                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
4528                 if (p->cq_entries < p->sq_entries)
4529                         return -EINVAL;
4530         } else {
4531                 p->cq_entries = 2 * p->sq_entries;
4532         }
4533
4534         ctx = io_ring_ctx_alloc(p);
4535         if (!ctx)
4536                 return -ENOMEM;
4537
4538         /*
4539          * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
4540          * space applications don't need to do io completion events
4541          * polling again, they can rely on io_sq_thread to do polling
4542          * work, which can reduce cpu usage and uring_lock contention.
4543          */
4544         if (ctx->flags & IORING_SETUP_IOPOLL &&
4545             !(ctx->flags & IORING_SETUP_SQPOLL))
4546                 ctx->syscall_iopoll = 1;
4547
4548         ctx->compat = in_compat_syscall();
4549         if (!capable(CAP_IPC_LOCK))
4550                 ctx->user = get_uid(current_user());
4551
4552         /*
4553          * For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
4554          * COOP_TASKRUN is set, then IPIs are never needed by the app.
4555          */
4556         ret = -EINVAL;
4557         if (ctx->flags & IORING_SETUP_SQPOLL) {
4558                 /* IPI related flags don't make sense with SQPOLL */
4559                 if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
4560                                   IORING_SETUP_TASKRUN_FLAG))
4561                         goto err;
4562                 ctx->notify_method = TWA_SIGNAL_NO_IPI;
4563         } else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
4564                 ctx->notify_method = TWA_SIGNAL_NO_IPI;
4565         } else {
4566                 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
4567                         goto err;
4568                 ctx->notify_method = TWA_SIGNAL;
4569         }
4570
4571         /*
4572          * This is just grabbed for accounting purposes. When a process exits,
4573          * the mm is exited and dropped before the files, hence we need to hang
4574          * on to this mm purely for the purposes of being able to unaccount
4575          * memory (locked/pinned vm). It's not used for anything else.
4576          */
4577         mmgrab(current->mm);
4578         ctx->mm_account = current->mm;
4579
4580         ret = io_allocate_scq_urings(ctx, p);
4581         if (ret)
4582                 goto err;
4583
4584         ret = io_sq_offload_create(ctx, p);
4585         if (ret)
4586                 goto err;
4587         /* always set a rsrc node */
4588         ret = io_rsrc_node_switch_start(ctx);
4589         if (ret)
4590                 goto err;
4591         io_rsrc_node_switch(ctx, NULL);
4592
4593         memset(&p->sq_off, 0, sizeof(p->sq_off));
4594         p->sq_off.head = offsetof(struct io_rings, sq.head);
4595         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
4596         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
4597         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
4598         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
4599         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
4600         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
4601
4602         memset(&p->cq_off, 0, sizeof(p->cq_off));
4603         p->cq_off.head = offsetof(struct io_rings, cq.head);
4604         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
4605         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
4606         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
4607         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
4608         p->cq_off.cqes = offsetof(struct io_rings, cqes);
4609         p->cq_off.flags = offsetof(struct io_rings, cq_flags);
4610
4611         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
4612                         IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
4613                         IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
4614                         IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
4615                         IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
4616                         IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
4617                         IORING_FEAT_LINKED_FILE;
4618
4619         if (copy_to_user(params, p, sizeof(*p))) {
4620                 ret = -EFAULT;
4621                 goto err;
4622         }
4623
4624         file = io_uring_get_file(ctx);
4625         if (IS_ERR(file)) {
4626                 ret = PTR_ERR(file);
4627                 goto err;
4628         }
4629
4630         /*
4631          * Install ring fd as the very last thing, so we don't risk someone
4632          * having closed it before we finish setup
4633          */
4634         ret = io_uring_install_fd(ctx, file);
4635         if (ret < 0) {
4636                 /* fput will clean it up */
4637                 fput(file);
4638                 return ret;
4639         }
4640
4641         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
4642         return ret;
4643 err:
4644         io_ring_ctx_wait_and_kill(ctx);
4645         return ret;
4646 }
4647
4648 /*
4649  * Sets up an aio uring context, and returns the fd. Applications asks for a
4650  * ring size, we return the actual sq/cq ring sizes (among other things) in the
4651  * params structure passed in.
4652  */
4653 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
4654 {
4655         struct io_uring_params p;
4656         int i;
4657
4658         if (copy_from_user(&p, params, sizeof(p)))
4659                 return -EFAULT;
4660         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
4661                 if (p.resv[i])
4662                         return -EINVAL;
4663         }
4664
4665         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
4666                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
4667                         IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
4668                         IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
4669                         IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
4670                         IORING_SETUP_SQE128 | IORING_SETUP_CQE32))
4671                 return -EINVAL;
4672
4673         return io_uring_create(entries, &p, params);
4674 }
4675
4676 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
4677                 struct io_uring_params __user *, params)
4678 {
4679         return io_uring_setup(entries, params);
4680 }
4681
4682 static __cold int io_probe(struct io_ring_ctx *ctx, void __user *arg,
4683                            unsigned nr_args)
4684 {
4685         struct io_uring_probe *p;
4686         size_t size;
4687         int i, ret;
4688
4689         size = struct_size(p, ops, nr_args);
4690         if (size == SIZE_MAX)
4691                 return -EOVERFLOW;
4692         p = kzalloc(size, GFP_KERNEL);
4693         if (!p)
4694                 return -ENOMEM;
4695
4696         ret = -EFAULT;
4697         if (copy_from_user(p, arg, size))
4698                 goto out;
4699         ret = -EINVAL;
4700         if (memchr_inv(p, 0, size))
4701                 goto out;
4702
4703         p->last_op = IORING_OP_LAST - 1;
4704         if (nr_args > IORING_OP_LAST)
4705                 nr_args = IORING_OP_LAST;
4706
4707         for (i = 0; i < nr_args; i++) {
4708                 p->ops[i].op = i;
4709                 if (!io_op_defs[i].not_supported)
4710                         p->ops[i].flags = IO_URING_OP_SUPPORTED;
4711         }
4712         p->ops_len = i;
4713
4714         ret = 0;
4715         if (copy_to_user(arg, p, size))
4716                 ret = -EFAULT;
4717 out:
4718         kfree(p);
4719         return ret;
4720 }
4721
4722 static int io_register_personality(struct io_ring_ctx *ctx)
4723 {
4724         const struct cred *creds;
4725         u32 id;
4726         int ret;
4727
4728         creds = get_current_cred();
4729
4730         ret = xa_alloc_cyclic(&ctx->personalities, &id, (void *)creds,
4731                         XA_LIMIT(0, USHRT_MAX), &ctx->pers_next, GFP_KERNEL);
4732         if (ret < 0) {
4733                 put_cred(creds);
4734                 return ret;
4735         }
4736         return id;
4737 }
4738
4739 static __cold int io_register_restrictions(struct io_ring_ctx *ctx,
4740                                            void __user *arg, unsigned int nr_args)
4741 {
4742         struct io_uring_restriction *res;
4743         size_t size;
4744         int i, ret;
4745
4746         /* Restrictions allowed only if rings started disabled */
4747         if (!(ctx->flags & IORING_SETUP_R_DISABLED))
4748                 return -EBADFD;
4749
4750         /* We allow only a single restrictions registration */
4751         if (ctx->restrictions.registered)
4752                 return -EBUSY;
4753
4754         if (!arg || nr_args > IORING_MAX_RESTRICTIONS)
4755                 return -EINVAL;
4756
4757         size = array_size(nr_args, sizeof(*res));
4758         if (size == SIZE_MAX)
4759                 return -EOVERFLOW;
4760
4761         res = memdup_user(arg, size);
4762         if (IS_ERR(res))
4763                 return PTR_ERR(res);
4764
4765         ret = 0;
4766
4767         for (i = 0; i < nr_args; i++) {
4768                 switch (res[i].opcode) {
4769                 case IORING_RESTRICTION_REGISTER_OP:
4770                         if (res[i].register_op >= IORING_REGISTER_LAST) {
4771                                 ret = -EINVAL;
4772                                 goto out;
4773                         }
4774
4775                         __set_bit(res[i].register_op,
4776                                   ctx->restrictions.register_op);
4777                         break;
4778                 case IORING_RESTRICTION_SQE_OP:
4779                         if (res[i].sqe_op >= IORING_OP_LAST) {
4780                                 ret = -EINVAL;
4781                                 goto out;
4782                         }
4783
4784                         __set_bit(res[i].sqe_op, ctx->restrictions.sqe_op);
4785                         break;
4786                 case IORING_RESTRICTION_SQE_FLAGS_ALLOWED:
4787                         ctx->restrictions.sqe_flags_allowed = res[i].sqe_flags;
4788                         break;
4789                 case IORING_RESTRICTION_SQE_FLAGS_REQUIRED:
4790                         ctx->restrictions.sqe_flags_required = res[i].sqe_flags;
4791                         break;
4792                 default:
4793                         ret = -EINVAL;
4794                         goto out;
4795                 }
4796         }
4797
4798 out:
4799         /* Reset all restrictions if an error happened */
4800         if (ret != 0)
4801                 memset(&ctx->restrictions, 0, sizeof(ctx->restrictions));
4802         else
4803                 ctx->restrictions.registered = true;
4804
4805         kfree(res);
4806         return ret;
4807 }
4808
4809 static int io_register_enable_rings(struct io_ring_ctx *ctx)
4810 {
4811         if (!(ctx->flags & IORING_SETUP_R_DISABLED))
4812                 return -EBADFD;
4813
4814         if (ctx->restrictions.registered)
4815                 ctx->restricted = 1;
4816
4817         ctx->flags &= ~IORING_SETUP_R_DISABLED;
4818         if (ctx->sq_data && wq_has_sleeper(&ctx->sq_data->wait))
4819                 wake_up(&ctx->sq_data->wait);
4820         return 0;
4821 }
4822
4823 static __cold int io_register_iowq_aff(struct io_ring_ctx *ctx,
4824                                        void __user *arg, unsigned len)
4825 {
4826         struct io_uring_task *tctx = current->io_uring;
4827         cpumask_var_t new_mask;
4828         int ret;
4829
4830         if (!tctx || !tctx->io_wq)
4831                 return -EINVAL;
4832
4833         if (!alloc_cpumask_var(&new_mask, GFP_KERNEL))
4834                 return -ENOMEM;
4835
4836         cpumask_clear(new_mask);
4837         if (len > cpumask_size())
4838                 len = cpumask_size();
4839
4840         if (in_compat_syscall()) {
4841                 ret = compat_get_bitmap(cpumask_bits(new_mask),
4842                                         (const compat_ulong_t __user *)arg,
4843                                         len * 8 /* CHAR_BIT */);
4844         } else {
4845                 ret = copy_from_user(new_mask, arg, len);
4846         }
4847
4848         if (ret) {
4849                 free_cpumask_var(new_mask);
4850                 return -EFAULT;
4851         }
4852
4853         ret = io_wq_cpu_affinity(tctx->io_wq, new_mask);
4854         free_cpumask_var(new_mask);
4855         return ret;
4856 }
4857
4858 static __cold int io_unregister_iowq_aff(struct io_ring_ctx *ctx)
4859 {
4860         struct io_uring_task *tctx = current->io_uring;
4861
4862         if (!tctx || !tctx->io_wq)
4863                 return -EINVAL;
4864
4865         return io_wq_cpu_affinity(tctx->io_wq, NULL);
4866 }
4867
4868 static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
4869                                                void __user *arg)
4870         __must_hold(&ctx->uring_lock)
4871 {
4872         struct io_tctx_node *node;
4873         struct io_uring_task *tctx = NULL;
4874         struct io_sq_data *sqd = NULL;
4875         __u32 new_count[2];
4876         int i, ret;
4877
4878         if (copy_from_user(new_count, arg, sizeof(new_count)))
4879                 return -EFAULT;
4880         for (i = 0; i < ARRAY_SIZE(new_count); i++)
4881                 if (new_count[i] > INT_MAX)
4882                         return -EINVAL;
4883
4884         if (ctx->flags & IORING_SETUP_SQPOLL) {
4885                 sqd = ctx->sq_data;
4886                 if (sqd) {
4887                         /*
4888                          * Observe the correct sqd->lock -> ctx->uring_lock
4889                          * ordering. Fine to drop uring_lock here, we hold
4890                          * a ref to the ctx.
4891                          */
4892                         refcount_inc(&sqd->refs);
4893                         mutex_unlock(&ctx->uring_lock);
4894                         mutex_lock(&sqd->lock);
4895                         mutex_lock(&ctx->uring_lock);
4896                         if (sqd->thread)
4897                                 tctx = sqd->thread->io_uring;
4898                 }
4899         } else {
4900                 tctx = current->io_uring;
4901         }
4902
4903         BUILD_BUG_ON(sizeof(new_count) != sizeof(ctx->iowq_limits));
4904
4905         for (i = 0; i < ARRAY_SIZE(new_count); i++)
4906                 if (new_count[i])
4907                         ctx->iowq_limits[i] = new_count[i];
4908         ctx->iowq_limits_set = true;
4909
4910         if (tctx && tctx->io_wq) {
4911                 ret = io_wq_max_workers(tctx->io_wq, new_count);
4912                 if (ret)
4913                         goto err;
4914         } else {
4915                 memset(new_count, 0, sizeof(new_count));
4916         }
4917
4918         if (sqd) {
4919                 mutex_unlock(&sqd->lock);
4920                 io_put_sq_data(sqd);
4921         }
4922
4923         if (copy_to_user(arg, new_count, sizeof(new_count)))
4924                 return -EFAULT;
4925
4926         /* that's it for SQPOLL, only the SQPOLL task creates requests */
4927         if (sqd)
4928                 return 0;
4929
4930         /* now propagate the restriction to all registered users */
4931         list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
4932                 struct io_uring_task *tctx = node->task->io_uring;
4933
4934                 if (WARN_ON_ONCE(!tctx->io_wq))
4935                         continue;
4936
4937                 for (i = 0; i < ARRAY_SIZE(new_count); i++)
4938                         new_count[i] = ctx->iowq_limits[i];
4939                 /* ignore errors, it always returns zero anyway */
4940                 (void)io_wq_max_workers(tctx->io_wq, new_count);
4941         }
4942         return 0;
4943 err:
4944         if (sqd) {
4945                 mutex_unlock(&sqd->lock);
4946                 io_put_sq_data(sqd);
4947         }
4948         return ret;
4949 }
4950
4951 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
4952                                void __user *arg, unsigned nr_args)
4953         __releases(ctx->uring_lock)
4954         __acquires(ctx->uring_lock)
4955 {
4956         int ret;
4957
4958         /*
4959          * We're inside the ring mutex, if the ref is already dying, then
4960          * someone else killed the ctx or is already going through
4961          * io_uring_register().
4962          */
4963         if (percpu_ref_is_dying(&ctx->refs))
4964                 return -ENXIO;
4965
4966         if (ctx->restricted) {
4967                 if (opcode >= IORING_REGISTER_LAST)
4968                         return -EINVAL;
4969                 opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
4970                 if (!test_bit(opcode, ctx->restrictions.register_op))
4971                         return -EACCES;
4972         }
4973
4974         switch (opcode) {
4975         case IORING_REGISTER_BUFFERS:
4976                 ret = -EFAULT;
4977                 if (!arg)
4978                         break;
4979                 ret = io_sqe_buffers_register(ctx, arg, nr_args, NULL);
4980                 break;
4981         case IORING_UNREGISTER_BUFFERS:
4982                 ret = -EINVAL;
4983                 if (arg || nr_args)
4984                         break;
4985                 ret = io_sqe_buffers_unregister(ctx);
4986                 break;
4987         case IORING_REGISTER_FILES:
4988                 ret = -EFAULT;
4989                 if (!arg)
4990                         break;
4991                 ret = io_sqe_files_register(ctx, arg, nr_args, NULL);
4992                 break;
4993         case IORING_UNREGISTER_FILES:
4994                 ret = -EINVAL;
4995                 if (arg || nr_args)
4996                         break;
4997                 ret = io_sqe_files_unregister(ctx);
4998                 break;
4999         case IORING_REGISTER_FILES_UPDATE:
5000                 ret = io_register_files_update(ctx, arg, nr_args);
5001                 break;
5002         case IORING_REGISTER_EVENTFD:
5003                 ret = -EINVAL;
5004                 if (nr_args != 1)
5005                         break;
5006                 ret = io_eventfd_register(ctx, arg, 0);
5007                 break;
5008         case IORING_REGISTER_EVENTFD_ASYNC:
5009                 ret = -EINVAL;
5010                 if (nr_args != 1)
5011                         break;
5012                 ret = io_eventfd_register(ctx, arg, 1);
5013                 break;
5014         case IORING_UNREGISTER_EVENTFD:
5015                 ret = -EINVAL;
5016                 if (arg || nr_args)
5017                         break;
5018                 ret = io_eventfd_unregister(ctx);
5019                 break;
5020         case IORING_REGISTER_PROBE:
5021                 ret = -EINVAL;
5022                 if (!arg || nr_args > 256)
5023                         break;
5024                 ret = io_probe(ctx, arg, nr_args);
5025                 break;
5026         case IORING_REGISTER_PERSONALITY:
5027                 ret = -EINVAL;
5028                 if (arg || nr_args)
5029                         break;
5030                 ret = io_register_personality(ctx);
5031                 break;
5032         case IORING_UNREGISTER_PERSONALITY:
5033                 ret = -EINVAL;
5034                 if (arg)
5035                         break;
5036                 ret = io_unregister_personality(ctx, nr_args);
5037                 break;
5038         case IORING_REGISTER_ENABLE_RINGS:
5039                 ret = -EINVAL;
5040                 if (arg || nr_args)
5041                         break;
5042                 ret = io_register_enable_rings(ctx);
5043                 break;
5044         case IORING_REGISTER_RESTRICTIONS:
5045                 ret = io_register_restrictions(ctx, arg, nr_args);
5046                 break;
5047         case IORING_REGISTER_FILES2:
5048                 ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_FILE);
5049                 break;
5050         case IORING_REGISTER_FILES_UPDATE2:
5051                 ret = io_register_rsrc_update(ctx, arg, nr_args,
5052                                               IORING_RSRC_FILE);
5053                 break;
5054         case IORING_REGISTER_BUFFERS2:
5055                 ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_BUFFER);
5056                 break;
5057         case IORING_REGISTER_BUFFERS_UPDATE:
5058                 ret = io_register_rsrc_update(ctx, arg, nr_args,
5059                                               IORING_RSRC_BUFFER);
5060                 break;
5061         case IORING_REGISTER_IOWQ_AFF:
5062                 ret = -EINVAL;
5063                 if (!arg || !nr_args)
5064                         break;
5065                 ret = io_register_iowq_aff(ctx, arg, nr_args);
5066                 break;
5067         case IORING_UNREGISTER_IOWQ_AFF:
5068                 ret = -EINVAL;
5069                 if (arg || nr_args)
5070                         break;
5071                 ret = io_unregister_iowq_aff(ctx);
5072                 break;
5073         case IORING_REGISTER_IOWQ_MAX_WORKERS:
5074                 ret = -EINVAL;
5075                 if (!arg || nr_args != 2)
5076                         break;
5077                 ret = io_register_iowq_max_workers(ctx, arg);
5078                 break;
5079         case IORING_REGISTER_RING_FDS:
5080                 ret = io_ringfd_register(ctx, arg, nr_args);
5081                 break;
5082         case IORING_UNREGISTER_RING_FDS:
5083                 ret = io_ringfd_unregister(ctx, arg, nr_args);
5084                 break;
5085         case IORING_REGISTER_PBUF_RING:
5086                 ret = -EINVAL;
5087                 if (!arg || nr_args != 1)
5088                         break;
5089                 ret = io_register_pbuf_ring(ctx, arg);
5090                 break;
5091         case IORING_UNREGISTER_PBUF_RING:
5092                 ret = -EINVAL;
5093                 if (!arg || nr_args != 1)
5094                         break;
5095                 ret = io_unregister_pbuf_ring(ctx, arg);
5096                 break;
5097         default:
5098                 ret = -EINVAL;
5099                 break;
5100         }
5101
5102         return ret;
5103 }
5104
5105 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
5106                 void __user *, arg, unsigned int, nr_args)
5107 {
5108         struct io_ring_ctx *ctx;
5109         long ret = -EBADF;
5110         struct fd f;
5111
5112         f = fdget(fd);
5113         if (!f.file)
5114                 return -EBADF;
5115
5116         ret = -EOPNOTSUPP;
5117         if (!io_is_uring_fops(f.file))
5118                 goto out_fput;
5119
5120         ctx = f.file->private_data;
5121
5122         io_run_task_work();
5123
5124         mutex_lock(&ctx->uring_lock);
5125         ret = __io_uring_register(ctx, opcode, arg, nr_args);
5126         mutex_unlock(&ctx->uring_lock);
5127         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret);
5128 out_fput:
5129         fdput(f);
5130         return ret;
5131 }
5132
5133 static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
5134 {
5135         WARN_ON_ONCE(1);
5136         return -ECANCELED;
5137 }
5138
5139 const struct io_op_def io_op_defs[] = {
5140         [IORING_OP_NOP] = {
5141                 .audit_skip             = 1,
5142                 .iopoll                 = 1,
5143                 .name                   = "NOP",
5144                 .prep                   = io_nop_prep,
5145                 .issue                  = io_nop,
5146         },
5147         [IORING_OP_READV] = {
5148                 .needs_file             = 1,
5149                 .unbound_nonreg_file    = 1,
5150                 .pollin                 = 1,
5151                 .buffer_select          = 1,
5152                 .plug                   = 1,
5153                 .audit_skip             = 1,
5154                 .ioprio                 = 1,
5155                 .iopoll                 = 1,
5156                 .async_size             = sizeof(struct io_async_rw),
5157                 .name                   = "READV",
5158                 .prep                   = io_prep_rw,
5159                 .issue                  = io_read,
5160                 .prep_async             = io_readv_prep_async,
5161                 .cleanup                = io_readv_writev_cleanup,
5162         },
5163         [IORING_OP_WRITEV] = {
5164                 .needs_file             = 1,
5165                 .hash_reg_file          = 1,
5166                 .unbound_nonreg_file    = 1,
5167                 .pollout                = 1,
5168                 .plug                   = 1,
5169                 .audit_skip             = 1,
5170                 .ioprio                 = 1,
5171                 .iopoll                 = 1,
5172                 .async_size             = sizeof(struct io_async_rw),
5173                 .name                   = "WRITEV",
5174                 .prep                   = io_prep_rw,
5175                 .issue                  = io_write,
5176                 .prep_async             = io_writev_prep_async,
5177                 .cleanup                = io_readv_writev_cleanup,
5178         },
5179         [IORING_OP_FSYNC] = {
5180                 .needs_file             = 1,
5181                 .audit_skip             = 1,
5182                 .name                   = "FSYNC",
5183                 .prep                   = io_fsync_prep,
5184                 .issue                  = io_fsync,
5185         },
5186         [IORING_OP_READ_FIXED] = {
5187                 .needs_file             = 1,
5188                 .unbound_nonreg_file    = 1,
5189                 .pollin                 = 1,
5190                 .plug                   = 1,
5191                 .audit_skip             = 1,
5192                 .ioprio                 = 1,
5193                 .iopoll                 = 1,
5194                 .async_size             = sizeof(struct io_async_rw),
5195                 .name                   = "READ_FIXED",
5196                 .prep                   = io_prep_rw,
5197                 .issue                  = io_read,
5198         },
5199         [IORING_OP_WRITE_FIXED] = {
5200                 .needs_file             = 1,
5201                 .hash_reg_file          = 1,
5202                 .unbound_nonreg_file    = 1,
5203                 .pollout                = 1,
5204                 .plug                   = 1,
5205                 .audit_skip             = 1,
5206                 .ioprio                 = 1,
5207                 .iopoll                 = 1,
5208                 .async_size             = sizeof(struct io_async_rw),
5209                 .name                   = "WRITE_FIXED",
5210                 .prep                   = io_prep_rw,
5211                 .issue                  = io_write,
5212         },
5213         [IORING_OP_POLL_ADD] = {
5214                 .needs_file             = 1,
5215                 .unbound_nonreg_file    = 1,
5216                 .audit_skip             = 1,
5217                 .name                   = "POLL_ADD",
5218                 .prep                   = io_poll_add_prep,
5219                 .issue                  = io_poll_add,
5220         },
5221         [IORING_OP_POLL_REMOVE] = {
5222                 .audit_skip             = 1,
5223                 .name                   = "POLL_REMOVE",
5224                 .prep                   = io_poll_remove_prep,
5225                 .issue                  = io_poll_remove,
5226         },
5227         [IORING_OP_SYNC_FILE_RANGE] = {
5228                 .needs_file             = 1,
5229                 .audit_skip             = 1,
5230                 .name                   = "SYNC_FILE_RANGE",
5231                 .prep                   = io_sfr_prep,
5232                 .issue                  = io_sync_file_range,
5233         },
5234         [IORING_OP_SENDMSG] = {
5235                 .needs_file             = 1,
5236                 .unbound_nonreg_file    = 1,
5237                 .pollout                = 1,
5238                 .ioprio                 = 1,
5239                 .name                   = "SENDMSG",
5240 #if defined(CONFIG_NET)
5241                 .async_size             = sizeof(struct io_async_msghdr),
5242                 .prep                   = io_sendmsg_prep,
5243                 .issue                  = io_sendmsg,
5244                 .prep_async             = io_sendmsg_prep_async,
5245                 .cleanup                = io_sendmsg_recvmsg_cleanup,
5246 #else
5247                 .prep                   = io_eopnotsupp_prep,
5248 #endif
5249         },
5250         [IORING_OP_RECVMSG] = {
5251                 .needs_file             = 1,
5252                 .unbound_nonreg_file    = 1,
5253                 .pollin                 = 1,
5254                 .buffer_select          = 1,
5255                 .ioprio                 = 1,
5256                 .name                   = "RECVMSG",
5257 #if defined(CONFIG_NET)
5258                 .async_size             = sizeof(struct io_async_msghdr),
5259                 .prep                   = io_recvmsg_prep,
5260                 .issue                  = io_recvmsg,
5261                 .prep_async             = io_recvmsg_prep_async,
5262                 .cleanup                = io_sendmsg_recvmsg_cleanup,
5263 #else
5264                 .prep                   = io_eopnotsupp_prep,
5265 #endif
5266         },
5267         [IORING_OP_TIMEOUT] = {
5268                 .audit_skip             = 1,
5269                 .async_size             = sizeof(struct io_timeout_data),
5270                 .name                   = "TIMEOUT",
5271                 .prep                   = io_timeout_prep,
5272                 .issue                  = io_timeout,
5273         },
5274         [IORING_OP_TIMEOUT_REMOVE] = {
5275                 /* used by timeout updates' prep() */
5276                 .audit_skip             = 1,
5277                 .name                   = "TIMEOUT_REMOVE",
5278                 .prep                   = io_timeout_remove_prep,
5279                 .issue                  = io_timeout_remove,
5280         },
5281         [IORING_OP_ACCEPT] = {
5282                 .needs_file             = 1,
5283                 .unbound_nonreg_file    = 1,
5284                 .pollin                 = 1,
5285                 .poll_exclusive         = 1,
5286                 .ioprio                 = 1,    /* used for flags */
5287                 .name                   = "ACCEPT",
5288 #if defined(CONFIG_NET)
5289                 .prep                   = io_accept_prep,
5290                 .issue                  = io_accept,
5291 #else
5292                 .prep                   = io_eopnotsupp_prep,
5293 #endif
5294         },
5295         [IORING_OP_ASYNC_CANCEL] = {
5296                 .audit_skip             = 1,
5297                 .name                   = "ASYNC_CANCEL",
5298                 .prep                   = io_async_cancel_prep,
5299                 .issue                  = io_async_cancel,
5300         },
5301         [IORING_OP_LINK_TIMEOUT] = {
5302                 .audit_skip             = 1,
5303                 .async_size             = sizeof(struct io_timeout_data),
5304                 .name                   = "LINK_TIMEOUT",
5305                 .prep                   = io_link_timeout_prep,
5306                 .issue                  = io_no_issue,
5307         },
5308         [IORING_OP_CONNECT] = {
5309                 .needs_file             = 1,
5310                 .unbound_nonreg_file    = 1,
5311                 .pollout                = 1,
5312                 .name                   = "CONNECT",
5313 #if defined(CONFIG_NET)
5314                 .async_size             = sizeof(struct io_async_connect),
5315                 .prep                   = io_connect_prep,
5316                 .issue                  = io_connect,
5317                 .prep_async             = io_connect_prep_async,
5318 #else
5319                 .prep                   = io_eopnotsupp_prep,
5320 #endif
5321         },
5322         [IORING_OP_FALLOCATE] = {
5323                 .needs_file             = 1,
5324                 .name                   = "FALLOCATE",
5325                 .prep                   = io_fallocate_prep,
5326                 .issue                  = io_fallocate,
5327         },
5328         [IORING_OP_OPENAT] = {
5329                 .name                   = "OPENAT",
5330                 .prep                   = io_openat_prep,
5331                 .issue                  = io_openat,
5332                 .cleanup                = io_open_cleanup,
5333         },
5334         [IORING_OP_CLOSE] = {
5335                 .name                   = "CLOSE",
5336                 .prep                   = io_close_prep,
5337                 .issue                  = io_close,
5338         },
5339         [IORING_OP_FILES_UPDATE] = {
5340                 .audit_skip             = 1,
5341                 .iopoll                 = 1,
5342                 .name                   = "FILES_UPDATE",
5343                 .prep                   = io_files_update_prep,
5344                 .issue                  = io_files_update,
5345         },
5346         [IORING_OP_STATX] = {
5347                 .audit_skip             = 1,
5348                 .name                   = "STATX",
5349                 .prep                   = io_statx_prep,
5350                 .issue                  = io_statx,
5351                 .cleanup                = io_statx_cleanup,
5352         },
5353         [IORING_OP_READ] = {
5354                 .needs_file             = 1,
5355                 .unbound_nonreg_file    = 1,
5356                 .pollin                 = 1,
5357                 .buffer_select          = 1,
5358                 .plug                   = 1,
5359                 .audit_skip             = 1,
5360                 .ioprio                 = 1,
5361                 .iopoll                 = 1,
5362                 .async_size             = sizeof(struct io_async_rw),
5363                 .name                   = "READ",
5364                 .prep                   = io_prep_rw,
5365                 .issue                  = io_read,
5366         },
5367         [IORING_OP_WRITE] = {
5368                 .needs_file             = 1,
5369                 .hash_reg_file          = 1,
5370                 .unbound_nonreg_file    = 1,
5371                 .pollout                = 1,
5372                 .plug                   = 1,
5373                 .audit_skip             = 1,
5374                 .ioprio                 = 1,
5375                 .iopoll                 = 1,
5376                 .async_size             = sizeof(struct io_async_rw),
5377                 .name                   = "WRITE",
5378                 .prep                   = io_prep_rw,
5379                 .issue                  = io_write,
5380         },
5381         [IORING_OP_FADVISE] = {
5382                 .needs_file             = 1,
5383                 .audit_skip             = 1,
5384                 .name                   = "FADVISE",
5385                 .prep                   = io_fadvise_prep,
5386                 .issue                  = io_fadvise,
5387         },
5388         [IORING_OP_MADVISE] = {
5389                 .name                   = "MADVISE",
5390                 .prep                   = io_madvise_prep,
5391                 .issue                  = io_madvise,
5392         },
5393         [IORING_OP_SEND] = {
5394                 .needs_file             = 1,
5395                 .unbound_nonreg_file    = 1,
5396                 .pollout                = 1,
5397                 .audit_skip             = 1,
5398                 .ioprio                 = 1,
5399                 .name                   = "SEND",
5400 #if defined(CONFIG_NET)
5401                 .prep                   = io_sendmsg_prep,
5402                 .issue                  = io_send,
5403 #else
5404                 .prep                   = io_eopnotsupp_prep,
5405 #endif
5406         },
5407         [IORING_OP_RECV] = {
5408                 .needs_file             = 1,
5409                 .unbound_nonreg_file    = 1,
5410                 .pollin                 = 1,
5411                 .buffer_select          = 1,
5412                 .audit_skip             = 1,
5413                 .ioprio                 = 1,
5414                 .name                   = "RECV",
5415 #if defined(CONFIG_NET)
5416                 .prep                   = io_recvmsg_prep,
5417                 .issue                  = io_recv,
5418 #else
5419                 .prep                   = io_eopnotsupp_prep,
5420 #endif
5421         },
5422         [IORING_OP_OPENAT2] = {
5423                 .name                   = "OPENAT2",
5424                 .prep                   = io_openat2_prep,
5425                 .issue                  = io_openat2,
5426                 .cleanup                = io_open_cleanup,
5427         },
5428         [IORING_OP_EPOLL_CTL] = {
5429                 .unbound_nonreg_file    = 1,
5430                 .audit_skip             = 1,
5431                 .name                   = "EPOLL",
5432 #if defined(CONFIG_EPOLL)
5433                 .prep                   = io_epoll_ctl_prep,
5434                 .issue                  = io_epoll_ctl,
5435 #else
5436                 .prep                   = io_eopnotsupp_prep,
5437 #endif
5438         },
5439         [IORING_OP_SPLICE] = {
5440                 .needs_file             = 1,
5441                 .hash_reg_file          = 1,
5442                 .unbound_nonreg_file    = 1,
5443                 .audit_skip             = 1,
5444                 .name                   = "SPLICE",
5445                 .prep                   = io_splice_prep,
5446                 .issue                  = io_splice,
5447         },
5448         [IORING_OP_PROVIDE_BUFFERS] = {
5449                 .audit_skip             = 1,
5450                 .iopoll                 = 1,
5451                 .name                   = "PROVIDE_BUFFERS",
5452                 .prep                   = io_provide_buffers_prep,
5453                 .issue                  = io_provide_buffers,
5454         },
5455         [IORING_OP_REMOVE_BUFFERS] = {
5456                 .audit_skip             = 1,
5457                 .iopoll                 = 1,
5458                 .name                   = "REMOVE_BUFFERS",
5459                 .prep                   = io_remove_buffers_prep,
5460                 .issue                  = io_remove_buffers,
5461         },
5462         [IORING_OP_TEE] = {
5463                 .needs_file             = 1,
5464                 .hash_reg_file          = 1,
5465                 .unbound_nonreg_file    = 1,
5466                 .audit_skip             = 1,
5467                 .name                   = "TEE",
5468                 .prep                   = io_tee_prep,
5469                 .issue                  = io_tee,
5470         },
5471         [IORING_OP_SHUTDOWN] = {
5472                 .needs_file             = 1,
5473                 .name                   = "SHUTDOWN",
5474 #if defined(CONFIG_NET)
5475                 .prep                   = io_shutdown_prep,
5476                 .issue                  = io_shutdown,
5477 #else
5478                 .prep                   = io_eopnotsupp_prep,
5479 #endif
5480         },
5481         [IORING_OP_RENAMEAT] = {
5482                 .name                   = "RENAMEAT",
5483                 .prep                   = io_renameat_prep,
5484                 .issue                  = io_renameat,
5485                 .cleanup                = io_renameat_cleanup,
5486         },
5487         [IORING_OP_UNLINKAT] = {
5488                 .name                   = "UNLINKAT",
5489                 .prep                   = io_unlinkat_prep,
5490                 .issue                  = io_unlinkat,
5491                 .cleanup                = io_unlinkat_cleanup,
5492         },
5493         [IORING_OP_MKDIRAT] = {
5494                 .name                   = "MKDIRAT",
5495                 .prep                   = io_mkdirat_prep,
5496                 .issue                  = io_mkdirat,
5497                 .cleanup                = io_mkdirat_cleanup,
5498         },
5499         [IORING_OP_SYMLINKAT] = {
5500                 .name                   = "SYMLINKAT",
5501                 .prep                   = io_symlinkat_prep,
5502                 .issue                  = io_symlinkat,
5503                 .cleanup                = io_link_cleanup,
5504         },
5505         [IORING_OP_LINKAT] = {
5506                 .name                   = "LINKAT",
5507                 .prep                   = io_linkat_prep,
5508                 .issue                  = io_linkat,
5509                 .cleanup                = io_link_cleanup,
5510         },
5511         [IORING_OP_MSG_RING] = {
5512                 .needs_file             = 1,
5513                 .iopoll                 = 1,
5514                 .name                   = "MSG_RING",
5515                 .prep                   = io_msg_ring_prep,
5516                 .issue                  = io_msg_ring,
5517         },
5518         [IORING_OP_FSETXATTR] = {
5519                 .needs_file = 1,
5520                 .name                   = "FSETXATTR",
5521                 .prep                   = io_fsetxattr_prep,
5522                 .issue                  = io_fsetxattr,
5523                 .cleanup                = io_xattr_cleanup,
5524         },
5525         [IORING_OP_SETXATTR] = {
5526                 .name                   = "SETXATTR",
5527                 .prep                   = io_setxattr_prep,
5528                 .issue                  = io_setxattr,
5529                 .cleanup                = io_xattr_cleanup,
5530         },
5531         [IORING_OP_FGETXATTR] = {
5532                 .needs_file = 1,
5533                 .name                   = "FGETXATTR",
5534                 .prep                   = io_fgetxattr_prep,
5535                 .issue                  = io_fgetxattr,
5536                 .cleanup                = io_xattr_cleanup,
5537         },
5538         [IORING_OP_GETXATTR] = {
5539                 .name                   = "GETXATTR",
5540                 .prep                   = io_getxattr_prep,
5541                 .issue                  = io_getxattr,
5542                 .cleanup                = io_xattr_cleanup,
5543         },
5544         [IORING_OP_SOCKET] = {
5545                 .audit_skip             = 1,
5546                 .name                   = "SOCKET",
5547 #if defined(CONFIG_NET)
5548                 .prep                   = io_socket_prep,
5549                 .issue                  = io_socket,
5550 #else
5551                 .prep                   = io_eopnotsupp_prep,
5552 #endif
5553         },
5554         [IORING_OP_URING_CMD] = {
5555                 .needs_file             = 1,
5556                 .plug                   = 1,
5557                 .name                   = "URING_CMD",
5558                 .async_size             = uring_cmd_pdu_size(1),
5559                 .prep                   = io_uring_cmd_prep,
5560                 .issue                  = io_uring_cmd,
5561                 .prep_async             = io_uring_cmd_prep_async,
5562         },
5563 };
5564
5565 static int __init io_uring_init(void)
5566 {
5567         int i;
5568
5569 #define __BUILD_BUG_VERIFY_ELEMENT(stype, eoffset, etype, ename) do { \
5570         BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
5571         BUILD_BUG_ON(sizeof(etype) != sizeof_field(stype, ename)); \
5572 } while (0)
5573
5574 #define BUILD_BUG_SQE_ELEM(eoffset, etype, ename) \
5575         __BUILD_BUG_VERIFY_ELEMENT(struct io_uring_sqe, eoffset, etype, ename)
5576         BUILD_BUG_ON(sizeof(struct io_uring_sqe) != 64);
5577         BUILD_BUG_SQE_ELEM(0,  __u8,   opcode);
5578         BUILD_BUG_SQE_ELEM(1,  __u8,   flags);
5579         BUILD_BUG_SQE_ELEM(2,  __u16,  ioprio);
5580         BUILD_BUG_SQE_ELEM(4,  __s32,  fd);
5581         BUILD_BUG_SQE_ELEM(8,  __u64,  off);
5582         BUILD_BUG_SQE_ELEM(8,  __u64,  addr2);
5583         BUILD_BUG_SQE_ELEM(16, __u64,  addr);
5584         BUILD_BUG_SQE_ELEM(16, __u64,  splice_off_in);
5585         BUILD_BUG_SQE_ELEM(24, __u32,  len);
5586         BUILD_BUG_SQE_ELEM(28,     __kernel_rwf_t, rw_flags);
5587         BUILD_BUG_SQE_ELEM(28, /* compat */   int, rw_flags);
5588         BUILD_BUG_SQE_ELEM(28, /* compat */ __u32, rw_flags);
5589         BUILD_BUG_SQE_ELEM(28, __u32,  fsync_flags);
5590         BUILD_BUG_SQE_ELEM(28, /* compat */ __u16,  poll_events);
5591         BUILD_BUG_SQE_ELEM(28, __u32,  poll32_events);
5592         BUILD_BUG_SQE_ELEM(28, __u32,  sync_range_flags);
5593         BUILD_BUG_SQE_ELEM(28, __u32,  msg_flags);
5594         BUILD_BUG_SQE_ELEM(28, __u32,  timeout_flags);
5595         BUILD_BUG_SQE_ELEM(28, __u32,  accept_flags);
5596         BUILD_BUG_SQE_ELEM(28, __u32,  cancel_flags);
5597         BUILD_BUG_SQE_ELEM(28, __u32,  open_flags);
5598         BUILD_BUG_SQE_ELEM(28, __u32,  statx_flags);
5599         BUILD_BUG_SQE_ELEM(28, __u32,  fadvise_advice);
5600         BUILD_BUG_SQE_ELEM(28, __u32,  splice_flags);
5601         BUILD_BUG_SQE_ELEM(32, __u64,  user_data);
5602         BUILD_BUG_SQE_ELEM(40, __u16,  buf_index);
5603         BUILD_BUG_SQE_ELEM(40, __u16,  buf_group);
5604         BUILD_BUG_SQE_ELEM(42, __u16,  personality);
5605         BUILD_BUG_SQE_ELEM(44, __s32,  splice_fd_in);
5606         BUILD_BUG_SQE_ELEM(44, __u32,  file_index);
5607         BUILD_BUG_SQE_ELEM(48, __u64,  addr3);
5608
5609         BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
5610                      sizeof(struct io_uring_rsrc_update));
5611         BUILD_BUG_ON(sizeof(struct io_uring_rsrc_update) >
5612                      sizeof(struct io_uring_rsrc_update2));
5613
5614         /* ->buf_index is u16 */
5615         BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 0);
5616         BUILD_BUG_ON(offsetof(struct io_uring_buf, resv) !=
5617                      offsetof(struct io_uring_buf_ring, tail));
5618
5619         /* should fit into one byte */
5620         BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
5621         BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
5622         BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
5623
5624         BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST);
5625         BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof(int));
5626
5627         BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));
5628
5629         for (i = 0; i < ARRAY_SIZE(io_op_defs); i++) {
5630                 BUG_ON(!io_op_defs[i].prep);
5631                 if (io_op_defs[i].prep != io_eopnotsupp_prep)
5632                         BUG_ON(!io_op_defs[i].issue);
5633                 WARN_ON_ONCE(!io_op_defs[i].name);
5634         }
5635
5636         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
5637                                 SLAB_ACCOUNT);
5638         return 0;
5639 };
5640 __initcall(io_uring_init);