1 // SPDX-License-Identifier: GPL-2.0
3 * Shared application/kernel submission and completion ring pairs, for
4 * supporting fast/efficient IO.
6 * A note on the read/write ordering memory barriers that are matched between
7 * the application and kernel side.
9 * After the application reads the CQ ring tail, it must use an
10 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11 * before writing the tail (using smp_load_acquire to read the tail will
12 * do). It also needs a smp_mb() before updating CQ head (ordering the
13 * entry load(s) with the head store), pairing with an implicit barrier
14 * through a control-dependency in io_get_cqe (smp_store_release to
15 * store head will do). Failure to do so could lead to reading invalid
18 * Likewise, the application must use an appropriate smp_wmb() before
19 * writing the SQ tail (ordering SQ entry stores with the tail store),
20 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21 * to store the tail will do). And it needs a barrier ordering the SQ
22 * head load before writing new SQ entries (smp_load_acquire to read
25 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27 * updating the SQ tail; a full memory barrier smp_mb() is needed
30 * Also see the examples in the liburing library:
32 * git://git.kernel.dk/liburing
34 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35 * from data shared between the kernel and application. This is done both
36 * for ordering purposes, but also to ensure that once a value is loaded from
37 * data that the application could potentially modify, it remains stable.
39 * Copyright (C) 2018-2019 Jens Axboe
40 * Copyright (c) 2018-2019 Christoph Hellwig
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <net/compat.h>
48 #include <linux/refcount.h>
49 #include <linux/uio.h>
50 #include <linux/bits.h>
52 #include <linux/sched/signal.h>
54 #include <linux/file.h>
55 #include <linux/fdtable.h>
57 #include <linux/mman.h>
58 #include <linux/percpu.h>
59 #include <linux/slab.h>
60 #include <linux/blk-mq.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
64 #include <net/af_unix.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73 #include <linux/namei.h>
74 #include <linux/fsnotify.h>
75 #include <linux/fadvise.h>
76 #include <linux/eventpoll.h>
77 #include <linux/splice.h>
78 #include <linux/task_work.h>
79 #include <linux/pagemap.h>
80 #include <linux/io_uring.h>
81 #include <linux/audit.h>
82 #include <linux/security.h>
84 #define CREATE_TRACE_POINTS
85 #include <trace/events/io_uring.h>
87 #include <uapi/linux/io_uring.h>
89 #include "../fs/internal.h"
92 #include "io_uring_types.h"
102 #include "openclose.h"
103 #include "uring_cmd.h"
107 #include "msg_ring.h"
110 #define IORING_MAX_ENTRIES 32768
111 #define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)
112 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
114 /* only define max */
115 #define IORING_MAX_FIXED_FILES (1U << 20)
116 #define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \
117 IORING_REGISTER_LAST + IORING_OP_LAST)
119 #define IO_RSRC_TAG_TABLE_SHIFT (PAGE_SHIFT - 3)
120 #define IO_RSRC_TAG_TABLE_MAX (1U << IO_RSRC_TAG_TABLE_SHIFT)
121 #define IO_RSRC_TAG_TABLE_MASK (IO_RSRC_TAG_TABLE_MAX - 1)
123 #define IORING_MAX_REG_BUFFERS (1U << 14)
125 #define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
126 IOSQE_IO_HARDLINK | IOSQE_ASYNC)
128 #define SQE_VALID_FLAGS (SQE_COMMON_FLAGS | IOSQE_BUFFER_SELECT | \
129 IOSQE_IO_DRAIN | IOSQE_CQE_SKIP_SUCCESS)
131 #define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
132 REQ_F_POLLED | REQ_F_INFLIGHT | REQ_F_CREDS | \
135 #define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | REQ_F_LINK | REQ_F_HARDLINK |\
138 #define IO_TCTX_REFS_CACHE_NR (1U << 10)
140 struct io_mapped_ubuf {
143 unsigned int nr_bvecs;
144 unsigned long acct_pages;
145 struct bio_vec bvec[];
150 struct io_overflow_cqe {
151 struct list_head list;
152 struct io_uring_cqe cqe;
156 struct list_head list;
161 struct io_mapped_ubuf *buf;
165 struct io_rsrc_node {
166 struct percpu_ref refs;
167 struct list_head node;
168 struct list_head rsrc_list;
169 struct io_rsrc_data *rsrc_data;
170 struct llist_node llist;
174 typedef void (rsrc_put_fn)(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc);
176 struct io_rsrc_data {
177 struct io_ring_ctx *ctx;
183 struct completion done;
187 #define IO_BUFFER_LIST_BUF_PER_PAGE (PAGE_SIZE / sizeof(struct io_uring_buf))
188 struct io_buffer_list {
190 * If ->buf_nr_pages is set, then buf_pages/buf_ring are used. If not,
191 * then these are classic provided buffers and ->buf_list is used.
194 struct list_head buf_list;
196 struct page **buf_pages;
197 struct io_uring_buf_ring *buf_ring;
202 /* below is for ring provided buffers */
210 struct list_head list;
218 IO_SQ_THREAD_SHOULD_STOP = 0,
219 IO_SQ_THREAD_SHOULD_PARK,
224 atomic_t park_pending;
227 /* ctx's that are using this sqd */
228 struct list_head ctx_list;
230 struct task_struct *thread;
231 struct wait_queue_head wait;
233 unsigned sq_thread_idle;
239 struct completion exited;
242 #define IO_COMPL_BATCH 32
243 #define IO_REQ_CACHE_SIZE 32
244 #define IO_REQ_ALLOC_BATCH 8
246 #define BGID_ARRAY 64
249 * Arbitrary limit, can be raised if need be
251 #define IO_RINGFD_REG_MAX 16
253 struct io_uring_task {
254 /* submission side */
257 struct wait_queue_head wait;
258 const struct io_ring_ctx *last;
260 struct percpu_counter inflight;
261 atomic_t inflight_tracked;
264 spinlock_t task_lock;
265 struct io_wq_work_list task_list;
266 struct io_wq_work_list prio_task_list;
267 struct callback_head task_work;
268 struct file **registered_rings;
273 * First field must be the file pointer in all the
274 * iocb unions! See also 'struct kiocb' in <linux/fs.h>
278 struct wait_queue_head *head;
280 struct wait_queue_entry wait;
283 struct io_poll_update {
289 bool update_user_data;
300 /* NOTE: kiocb has the file as the first member, so don't do it here */
307 struct io_rsrc_update {
314 struct io_provide_buf {
324 struct iov_iter iter;
325 struct iov_iter_state iter_state;
326 struct iovec fast_iov[UIO_FASTIOV];
330 struct io_rw_state s;
331 const struct iovec *free_iovec;
333 struct wait_page_queue wpq;
338 struct io_poll *double_poll;
342 IORING_RSRC_FILE = 0,
343 IORING_RSRC_BUFFER = 1,
347 IO_CHECK_CQ_OVERFLOW_BIT,
348 IO_CHECK_CQ_DROPPED_BIT,
351 struct io_tctx_node {
352 struct list_head ctx_node;
353 struct task_struct *task;
354 struct io_ring_ctx *ctx;
357 struct io_defer_entry {
358 struct list_head list;
359 struct io_kiocb *req;
364 /* needs req->file assigned */
365 unsigned needs_file : 1;
366 /* should block plug */
368 /* hash wq insertion if file is a regular file */
369 unsigned hash_reg_file : 1;
370 /* unbound wq insertion if file is a non-regular file */
371 unsigned unbound_nonreg_file : 1;
372 /* set if opcode supports polled "wait" */
374 unsigned pollout : 1;
375 unsigned poll_exclusive : 1;
376 /* op supports buffer selection */
377 unsigned buffer_select : 1;
378 /* opcode is not supported by this kernel */
379 unsigned not_supported : 1;
381 unsigned audit_skip : 1;
382 /* supports ioprio */
384 /* supports iopoll */
386 /* size of async data needed, if any */
387 unsigned short async_size;
389 int (*prep)(struct io_kiocb *, const struct io_uring_sqe *);
390 int (*issue)(struct io_kiocb *, unsigned int);
391 int (*prep_async)(struct io_kiocb *);
392 void (*cleanup)(struct io_kiocb *);
395 static const struct io_op_def io_op_defs[];
397 /* requests with any of those set should undergo io_disarm_next() */
398 #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
399 #define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
401 static void io_uring_del_tctx_node(unsigned long index);
402 static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
403 struct task_struct *task,
405 static void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
407 static void io_dismantle_req(struct io_kiocb *req);
408 static int __io_register_rsrc_update(struct io_ring_ctx *ctx, unsigned type,
409 struct io_uring_rsrc_update2 *up,
411 static void io_clean_op(struct io_kiocb *req);
412 static void io_queue_sqe(struct io_kiocb *req);
413 static void io_rsrc_put_work(struct work_struct *work);
415 static void io_req_task_queue(struct io_kiocb *req);
416 static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
417 static int io_req_prep_async(struct io_kiocb *req);
419 static void io_eventfd_signal(struct io_ring_ctx *ctx);
421 static struct kmem_cache *req_cachep;
423 static const struct file_operations io_uring_fops;
425 const char *io_uring_get_opcode(u8 opcode)
427 switch ((enum io_uring_op)opcode) {
430 case IORING_OP_READV:
432 case IORING_OP_WRITEV:
434 case IORING_OP_FSYNC:
436 case IORING_OP_READ_FIXED:
438 case IORING_OP_WRITE_FIXED:
439 return "WRITE_FIXED";
440 case IORING_OP_POLL_ADD:
442 case IORING_OP_POLL_REMOVE:
443 return "POLL_REMOVE";
444 case IORING_OP_SYNC_FILE_RANGE:
445 return "SYNC_FILE_RANGE";
446 case IORING_OP_SENDMSG:
448 case IORING_OP_RECVMSG:
450 case IORING_OP_TIMEOUT:
452 case IORING_OP_TIMEOUT_REMOVE:
453 return "TIMEOUT_REMOVE";
454 case IORING_OP_ACCEPT:
456 case IORING_OP_ASYNC_CANCEL:
457 return "ASYNC_CANCEL";
458 case IORING_OP_LINK_TIMEOUT:
459 return "LINK_TIMEOUT";
460 case IORING_OP_CONNECT:
462 case IORING_OP_FALLOCATE:
464 case IORING_OP_OPENAT:
466 case IORING_OP_CLOSE:
468 case IORING_OP_FILES_UPDATE:
469 return "FILES_UPDATE";
470 case IORING_OP_STATX:
474 case IORING_OP_WRITE:
476 case IORING_OP_FADVISE:
478 case IORING_OP_MADVISE:
484 case IORING_OP_OPENAT2:
486 case IORING_OP_EPOLL_CTL:
488 case IORING_OP_SPLICE:
490 case IORING_OP_PROVIDE_BUFFERS:
491 return "PROVIDE_BUFFERS";
492 case IORING_OP_REMOVE_BUFFERS:
493 return "REMOVE_BUFFERS";
496 case IORING_OP_SHUTDOWN:
498 case IORING_OP_RENAMEAT:
500 case IORING_OP_UNLINKAT:
502 case IORING_OP_MKDIRAT:
504 case IORING_OP_SYMLINKAT:
506 case IORING_OP_LINKAT:
508 case IORING_OP_MSG_RING:
510 case IORING_OP_FSETXATTR:
512 case IORING_OP_SETXATTR:
514 case IORING_OP_FGETXATTR:
516 case IORING_OP_GETXATTR:
518 case IORING_OP_SOCKET:
520 case IORING_OP_URING_CMD:
528 bool io_is_uring_fops(struct file *file)
530 return file->f_op == &io_uring_fops;
533 struct sock *io_uring_get_socket(struct file *file)
535 #if defined(CONFIG_UNIX)
536 if (io_is_uring_fops(file)) {
537 struct io_ring_ctx *ctx = file->private_data;
539 return ctx->ring_sock->sk;
544 EXPORT_SYMBOL(io_uring_get_socket);
546 #if defined(CONFIG_UNIX)
547 static inline bool io_file_need_scm(struct file *filp)
549 #if defined(IO_URING_SCM_ALL)
552 return !!unix_get_socket(filp);
556 static inline bool io_file_need_scm(struct file *filp)
562 static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
565 mutex_lock(&ctx->uring_lock);
570 static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
572 if (!wq_list_empty(&ctx->submit_state.compl_reqs))
573 __io_submit_flush_completions(ctx);
576 #define IO_RSRC_REF_BATCH 100
578 static void io_rsrc_put_node(struct io_rsrc_node *node, int nr)
580 percpu_ref_put_many(&node->refs, nr);
583 static inline void io_req_put_rsrc_locked(struct io_kiocb *req,
584 struct io_ring_ctx *ctx)
585 __must_hold(&ctx->uring_lock)
587 struct io_rsrc_node *node = req->rsrc_node;
590 if (node == ctx->rsrc_node)
591 ctx->rsrc_cached_refs++;
593 io_rsrc_put_node(node, 1);
597 static inline void io_req_put_rsrc(struct io_kiocb *req)
600 io_rsrc_put_node(req->rsrc_node, 1);
603 static __cold void io_rsrc_refs_drop(struct io_ring_ctx *ctx)
604 __must_hold(&ctx->uring_lock)
606 if (ctx->rsrc_cached_refs) {
607 io_rsrc_put_node(ctx->rsrc_node, ctx->rsrc_cached_refs);
608 ctx->rsrc_cached_refs = 0;
612 static void io_rsrc_refs_refill(struct io_ring_ctx *ctx)
613 __must_hold(&ctx->uring_lock)
615 ctx->rsrc_cached_refs += IO_RSRC_REF_BATCH;
616 percpu_ref_get_many(&ctx->rsrc_node->refs, IO_RSRC_REF_BATCH);
619 static inline void io_req_set_rsrc_node(struct io_kiocb *req,
620 struct io_ring_ctx *ctx,
621 unsigned int issue_flags)
623 if (!req->rsrc_node) {
624 req->rsrc_node = ctx->rsrc_node;
626 if (!(issue_flags & IO_URING_F_UNLOCKED)) {
627 lockdep_assert_held(&ctx->uring_lock);
628 ctx->rsrc_cached_refs--;
629 if (unlikely(ctx->rsrc_cached_refs < 0))
630 io_rsrc_refs_refill(ctx);
632 percpu_ref_get(&req->rsrc_node->refs);
637 static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list)
639 if (req->flags & REQ_F_BUFFER_RING) {
641 req->buf_list->head++;
642 req->flags &= ~REQ_F_BUFFER_RING;
644 list_add(&req->kbuf->list, list);
645 req->flags &= ~REQ_F_BUFFER_SELECTED;
648 return IORING_CQE_F_BUFFER | (req->buf_index << IORING_CQE_BUFFER_SHIFT);
651 static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req)
653 lockdep_assert_held(&req->ctx->completion_lock);
655 if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
657 return __io_put_kbuf(req, &req->ctx->io_buffers_comp);
660 inline unsigned int io_put_kbuf(struct io_kiocb *req, unsigned issue_flags)
664 if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
668 * We can add this buffer back to two lists:
670 * 1) The io_buffers_cache list. This one is protected by the
671 * ctx->uring_lock. If we already hold this lock, add back to this
672 * list as we can grab it from issue as well.
673 * 2) The io_buffers_comp list. This one is protected by the
674 * ctx->completion_lock.
676 * We migrate buffers from the comp_list to the issue cache list
679 if (req->flags & REQ_F_BUFFER_RING) {
680 /* no buffers to recycle for this case */
681 cflags = __io_put_kbuf(req, NULL);
682 } else if (issue_flags & IO_URING_F_UNLOCKED) {
683 struct io_ring_ctx *ctx = req->ctx;
685 spin_lock(&ctx->completion_lock);
686 cflags = __io_put_kbuf(req, &ctx->io_buffers_comp);
687 spin_unlock(&ctx->completion_lock);
689 lockdep_assert_held(&req->ctx->uring_lock);
691 cflags = __io_put_kbuf(req, &req->ctx->io_buffers_cache);
697 static struct io_buffer_list *io_buffer_get_list(struct io_ring_ctx *ctx,
700 if (ctx->io_bl && bgid < BGID_ARRAY)
701 return &ctx->io_bl[bgid];
703 return xa_load(&ctx->io_bl_xa, bgid);
706 static void io_kbuf_recycle(struct io_kiocb *req, unsigned issue_flags)
708 struct io_ring_ctx *ctx = req->ctx;
709 struct io_buffer_list *bl;
710 struct io_buffer *buf;
712 if (!(req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)))
715 * For legacy provided buffer mode, don't recycle if we already did
716 * IO to this buffer. For ring-mapped provided buffer mode, we should
717 * increment ring->head to explicitly monopolize the buffer to avoid
720 if ((req->flags & REQ_F_BUFFER_SELECTED) &&
721 (req->flags & REQ_F_PARTIAL_IO))
725 * READV uses fields in `struct io_rw` (len/addr) to stash the selected
726 * buffer data. However if that buffer is recycled the original request
727 * data stored in addr is lost. Therefore forbid recycling for now.
729 if (req->opcode == IORING_OP_READV)
733 * We don't need to recycle for REQ_F_BUFFER_RING, we can just clear
734 * the flag and hence ensure that bl->head doesn't get incremented.
735 * If the tail has already been incremented, hang on to it.
737 if (req->flags & REQ_F_BUFFER_RING) {
739 if (req->flags & REQ_F_PARTIAL_IO) {
740 req->buf_list->head++;
741 req->buf_list = NULL;
743 req->buf_index = req->buf_list->bgid;
744 req->flags &= ~REQ_F_BUFFER_RING;
750 io_ring_submit_lock(ctx, issue_flags);
753 bl = io_buffer_get_list(ctx, buf->bgid);
754 list_add(&buf->list, &bl->buf_list);
755 req->flags &= ~REQ_F_BUFFER_SELECTED;
756 req->buf_index = buf->bgid;
758 io_ring_submit_unlock(ctx, issue_flags);
761 static bool io_match_linked(struct io_kiocb *head)
763 struct io_kiocb *req;
765 io_for_each_link(req, head) {
766 if (req->flags & REQ_F_INFLIGHT)
773 * As io_match_task() but protected against racing with linked timeouts.
774 * User must not hold timeout_lock.
776 static bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
781 if (task && head->task != task)
786 if (head->flags & REQ_F_LINK_TIMEOUT) {
787 struct io_ring_ctx *ctx = head->ctx;
789 /* protect against races with linked timeouts */
790 spin_lock_irq(&ctx->timeout_lock);
791 matched = io_match_linked(head);
792 spin_unlock_irq(&ctx->timeout_lock);
794 matched = io_match_linked(head);
799 static inline void req_fail_link_node(struct io_kiocb *req, int res)
802 io_req_set_res(req, res, 0);
805 static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
807 wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
810 static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
812 struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
814 complete(&ctx->ref_comp);
817 static __cold void io_fallback_req_func(struct work_struct *work)
819 struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
821 struct llist_node *node = llist_del_all(&ctx->fallback_llist);
822 struct io_kiocb *req, *tmp;
825 percpu_ref_get(&ctx->refs);
826 llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node)
827 req->io_task_work.func(req, &locked);
830 io_submit_flush_completions(ctx);
831 mutex_unlock(&ctx->uring_lock);
833 percpu_ref_put(&ctx->refs);
836 static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
838 struct io_ring_ctx *ctx;
841 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
845 xa_init(&ctx->io_bl_xa);
848 * Use 5 bits less than the max cq entries, that should give us around
849 * 32 entries per hash list if totally full and uniformly spread.
851 hash_bits = ilog2(p->cq_entries);
855 ctx->cancel_hash_bits = hash_bits;
856 ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
858 if (!ctx->cancel_hash)
860 __hash_init(ctx->cancel_hash, 1U << hash_bits);
862 ctx->dummy_ubuf = kzalloc(sizeof(*ctx->dummy_ubuf), GFP_KERNEL);
863 if (!ctx->dummy_ubuf)
865 /* set invalid range, so io_import_fixed() fails meeting it */
866 ctx->dummy_ubuf->ubuf = -1UL;
868 if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
869 PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
872 ctx->flags = p->flags;
873 init_waitqueue_head(&ctx->sqo_sq_wait);
874 INIT_LIST_HEAD(&ctx->sqd_list);
875 INIT_LIST_HEAD(&ctx->cq_overflow_list);
876 INIT_LIST_HEAD(&ctx->io_buffers_cache);
877 INIT_LIST_HEAD(&ctx->apoll_cache);
878 init_completion(&ctx->ref_comp);
879 xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
880 mutex_init(&ctx->uring_lock);
881 init_waitqueue_head(&ctx->cq_wait);
882 spin_lock_init(&ctx->completion_lock);
883 spin_lock_init(&ctx->timeout_lock);
884 INIT_WQ_LIST(&ctx->iopoll_list);
885 INIT_LIST_HEAD(&ctx->io_buffers_pages);
886 INIT_LIST_HEAD(&ctx->io_buffers_comp);
887 INIT_LIST_HEAD(&ctx->defer_list);
888 INIT_LIST_HEAD(&ctx->timeout_list);
889 INIT_LIST_HEAD(&ctx->ltimeout_list);
890 spin_lock_init(&ctx->rsrc_ref_lock);
891 INIT_LIST_HEAD(&ctx->rsrc_ref_list);
892 INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
893 init_llist_head(&ctx->rsrc_put_llist);
894 INIT_LIST_HEAD(&ctx->tctx_list);
895 ctx->submit_state.free_list.next = NULL;
896 INIT_WQ_LIST(&ctx->locked_free_list);
897 INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
898 INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
901 kfree(ctx->dummy_ubuf);
902 kfree(ctx->cancel_hash);
904 xa_destroy(&ctx->io_bl_xa);
909 static void io_account_cq_overflow(struct io_ring_ctx *ctx)
911 struct io_rings *r = ctx->rings;
913 WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
917 static bool req_need_defer(struct io_kiocb *req, u32 seq)
919 if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
920 struct io_ring_ctx *ctx = req->ctx;
922 return seq + READ_ONCE(ctx->cq_extra) != ctx->cached_cq_tail;
928 static inline bool io_req_ffs_set(struct io_kiocb *req)
930 return req->flags & REQ_F_FIXED_FILE;
933 static inline void io_req_track_inflight(struct io_kiocb *req)
935 if (!(req->flags & REQ_F_INFLIGHT)) {
936 req->flags |= REQ_F_INFLIGHT;
937 atomic_inc(&req->task->io_uring->inflight_tracked);
941 static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
943 if (WARN_ON_ONCE(!req->link))
946 req->flags &= ~REQ_F_ARM_LTIMEOUT;
947 req->flags |= REQ_F_LINK_TIMEOUT;
949 /* linked timeouts should have two refs once prep'ed */
950 io_req_set_refcount(req);
951 __io_req_set_refcount(req->link, 2);
955 static inline struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
957 if (likely(!(req->flags & REQ_F_ARM_LTIMEOUT)))
959 return __io_prep_linked_timeout(req);
962 static noinline void __io_arm_ltimeout(struct io_kiocb *req)
964 io_queue_linked_timeout(__io_prep_linked_timeout(req));
967 static inline void io_arm_ltimeout(struct io_kiocb *req)
969 if (unlikely(req->flags & REQ_F_ARM_LTIMEOUT))
970 __io_arm_ltimeout(req);
973 static void io_prep_async_work(struct io_kiocb *req)
975 const struct io_op_def *def = &io_op_defs[req->opcode];
976 struct io_ring_ctx *ctx = req->ctx;
978 if (!(req->flags & REQ_F_CREDS)) {
979 req->flags |= REQ_F_CREDS;
980 req->creds = get_current_cred();
983 req->work.list.next = NULL;
985 req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
986 if (req->flags & REQ_F_FORCE_ASYNC)
987 req->work.flags |= IO_WQ_WORK_CONCURRENT;
989 if (req->flags & REQ_F_ISREG) {
990 if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL))
991 io_wq_hash_work(&req->work, file_inode(req->file));
992 } else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
993 if (def->unbound_nonreg_file)
994 req->work.flags |= IO_WQ_WORK_UNBOUND;
998 static void io_prep_async_link(struct io_kiocb *req)
1000 struct io_kiocb *cur;
1002 if (req->flags & REQ_F_LINK_TIMEOUT) {
1003 struct io_ring_ctx *ctx = req->ctx;
1005 spin_lock_irq(&ctx->timeout_lock);
1006 io_for_each_link(cur, req)
1007 io_prep_async_work(cur);
1008 spin_unlock_irq(&ctx->timeout_lock);
1010 io_for_each_link(cur, req)
1011 io_prep_async_work(cur);
1015 static inline void io_req_add_compl_list(struct io_kiocb *req)
1017 struct io_submit_state *state = &req->ctx->submit_state;
1019 if (!(req->flags & REQ_F_CQE_SKIP))
1020 state->flush_cqes = true;
1021 wq_list_add_tail(&req->comp_list, &state->compl_reqs);
1024 static void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
1026 struct io_kiocb *link = io_prep_linked_timeout(req);
1027 struct io_uring_task *tctx = req->task->io_uring;
1030 BUG_ON(!tctx->io_wq);
1032 /* init ->work of the whole link before punting */
1033 io_prep_async_link(req);
1036 * Not expected to happen, but if we do have a bug where this _can_
1037 * happen, catch it here and ensure the request is marked as
1038 * canceled. That will make io-wq go through the usual work cancel
1039 * procedure rather than attempt to run this request (or create a new
1042 if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
1043 req->work.flags |= IO_WQ_WORK_CANCEL;
1045 trace_io_uring_queue_async_work(req->ctx, req, req->cqe.user_data,
1046 req->opcode, req->flags, &req->work,
1047 io_wq_is_hashed(&req->work));
1048 io_wq_enqueue(tctx->io_wq, &req->work);
1050 io_queue_linked_timeout(link);
1053 static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
1055 while (!list_empty(&ctx->defer_list)) {
1056 struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
1057 struct io_defer_entry, list);
1059 if (req_need_defer(de->req, de->seq))
1061 list_del_init(&de->list);
1062 io_req_task_queue(de->req);
1067 static void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
1069 if (ctx->off_timeout_used || ctx->drain_active) {
1070 spin_lock(&ctx->completion_lock);
1071 if (ctx->off_timeout_used)
1072 io_flush_timeouts(ctx);
1073 if (ctx->drain_active)
1074 io_queue_deferred(ctx);
1075 io_commit_cqring(ctx);
1076 spin_unlock(&ctx->completion_lock);
1079 io_eventfd_signal(ctx);
1082 static inline bool io_sqring_full(struct io_ring_ctx *ctx)
1084 struct io_rings *r = ctx->rings;
1086 return READ_ONCE(r->sq.tail) - ctx->cached_sq_head == ctx->sq_entries;
1089 static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
1091 return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
1095 * writes to the cq entry need to come after reading head; the
1096 * control dependency is enough as we're using WRITE_ONCE to
1099 static noinline struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
1101 struct io_rings *rings = ctx->rings;
1102 unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
1103 unsigned int shift = 0;
1104 unsigned int free, queued, len;
1106 if (ctx->flags & IORING_SETUP_CQE32)
1109 /* userspace may cheat modifying the tail, be safe and do min */
1110 queued = min(__io_cqring_events(ctx), ctx->cq_entries);
1111 free = ctx->cq_entries - queued;
1112 /* we need a contiguous range, limit based on the current array offset */
1113 len = min(free, ctx->cq_entries - off);
1117 ctx->cached_cq_tail++;
1118 ctx->cqe_cached = &rings->cqes[off];
1119 ctx->cqe_sentinel = ctx->cqe_cached + len;
1121 return &rings->cqes[off << shift];
1124 static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
1126 if (likely(ctx->cqe_cached < ctx->cqe_sentinel)) {
1127 struct io_uring_cqe *cqe = ctx->cqe_cached;
1129 if (ctx->flags & IORING_SETUP_CQE32) {
1130 unsigned int off = ctx->cqe_cached - ctx->rings->cqes;
1135 ctx->cached_cq_tail++;
1140 return __io_get_cqe(ctx);
1143 static void io_eventfd_signal(struct io_ring_ctx *ctx)
1145 struct io_ev_fd *ev_fd;
1149 * rcu_dereference ctx->io_ev_fd once and use it for both for checking
1150 * and eventfd_signal
1152 ev_fd = rcu_dereference(ctx->io_ev_fd);
1155 * Check again if ev_fd exists incase an io_eventfd_unregister call
1156 * completed between the NULL check of ctx->io_ev_fd at the start of
1157 * the function and rcu_read_lock.
1159 if (unlikely(!ev_fd))
1161 if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
1164 if (!ev_fd->eventfd_async || io_wq_current_is_worker())
1165 eventfd_signal(ev_fd->cq_ev_fd, 1);
1170 static inline void io_cqring_wake(struct io_ring_ctx *ctx)
1173 * wake_up_all() may seem excessive, but io_wake_function() and
1174 * io_should_wake() handle the termination of the loop and only
1175 * wake as many waiters as we need to.
1177 if (wq_has_sleeper(&ctx->cq_wait))
1178 wake_up_all(&ctx->cq_wait);
1182 * This should only get called when at least one event has been posted.
1183 * Some applications rely on the eventfd notification count only changing
1184 * IFF a new CQE has been added to the CQ ring. There's no depedency on
1185 * 1:1 relationship between how many times this function is called (and
1186 * hence the eventfd count) and number of CQEs posted to the CQ ring.
1188 void io_cqring_ev_posted(struct io_ring_ctx *ctx)
1190 if (unlikely(ctx->off_timeout_used || ctx->drain_active ||
1192 __io_commit_cqring_flush(ctx);
1194 io_cqring_wake(ctx);
1197 static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx)
1199 if (unlikely(ctx->off_timeout_used || ctx->drain_active ||
1201 __io_commit_cqring_flush(ctx);
1203 if (ctx->flags & IORING_SETUP_SQPOLL)
1204 io_cqring_wake(ctx);
1207 /* Returns true if there are no backlogged entries after the flush */
1208 static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
1210 bool all_flushed, posted;
1211 size_t cqe_size = sizeof(struct io_uring_cqe);
1213 if (!force && __io_cqring_events(ctx) == ctx->cq_entries)
1216 if (ctx->flags & IORING_SETUP_CQE32)
1220 spin_lock(&ctx->completion_lock);
1221 while (!list_empty(&ctx->cq_overflow_list)) {
1222 struct io_uring_cqe *cqe = io_get_cqe(ctx);
1223 struct io_overflow_cqe *ocqe;
1227 ocqe = list_first_entry(&ctx->cq_overflow_list,
1228 struct io_overflow_cqe, list);
1230 memcpy(cqe, &ocqe->cqe, cqe_size);
1232 io_account_cq_overflow(ctx);
1235 list_del(&ocqe->list);
1239 all_flushed = list_empty(&ctx->cq_overflow_list);
1241 clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
1242 atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
1245 io_commit_cqring(ctx);
1246 spin_unlock(&ctx->completion_lock);
1248 io_cqring_ev_posted(ctx);
1252 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
1256 if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
1257 /* iopoll syncs against uring_lock, not completion_lock */
1258 if (ctx->flags & IORING_SETUP_IOPOLL)
1259 mutex_lock(&ctx->uring_lock);
1260 ret = __io_cqring_overflow_flush(ctx, false);
1261 if (ctx->flags & IORING_SETUP_IOPOLL)
1262 mutex_unlock(&ctx->uring_lock);
1268 static void __io_put_task(struct task_struct *task, int nr)
1270 struct io_uring_task *tctx = task->io_uring;
1272 percpu_counter_sub(&tctx->inflight, nr);
1273 if (unlikely(atomic_read(&tctx->in_idle)))
1274 wake_up(&tctx->wait);
1275 put_task_struct_many(task, nr);
1278 /* must to be called somewhat shortly after putting a request */
1279 static inline void io_put_task(struct task_struct *task, int nr)
1281 if (likely(task == current))
1282 task->io_uring->cached_refs += nr;
1284 __io_put_task(task, nr);
1287 static void io_task_refs_refill(struct io_uring_task *tctx)
1289 unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
1291 percpu_counter_add(&tctx->inflight, refill);
1292 refcount_add(refill, ¤t->usage);
1293 tctx->cached_refs += refill;
1296 static inline void io_get_task_refs(int nr)
1298 struct io_uring_task *tctx = current->io_uring;
1300 tctx->cached_refs -= nr;
1301 if (unlikely(tctx->cached_refs < 0))
1302 io_task_refs_refill(tctx);
1305 static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
1307 struct io_uring_task *tctx = task->io_uring;
1308 unsigned int refs = tctx->cached_refs;
1311 tctx->cached_refs = 0;
1312 percpu_counter_sub(&tctx->inflight, refs);
1313 put_task_struct_many(task, refs);
1317 static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
1318 s32 res, u32 cflags, u64 extra1,
1321 struct io_overflow_cqe *ocqe;
1322 size_t ocq_size = sizeof(struct io_overflow_cqe);
1323 bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);
1326 ocq_size += sizeof(struct io_uring_cqe);
1328 ocqe = kmalloc(ocq_size, GFP_ATOMIC | __GFP_ACCOUNT);
1329 trace_io_uring_cqe_overflow(ctx, user_data, res, cflags, ocqe);
1332 * If we're in ring overflow flush mode, or in task cancel mode,
1333 * or cannot allocate an overflow entry, then we need to drop it
1336 io_account_cq_overflow(ctx);
1337 set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq);
1340 if (list_empty(&ctx->cq_overflow_list)) {
1341 set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
1342 atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
1345 ocqe->cqe.user_data = user_data;
1346 ocqe->cqe.res = res;
1347 ocqe->cqe.flags = cflags;
1349 ocqe->cqe.big_cqe[0] = extra1;
1350 ocqe->cqe.big_cqe[1] = extra2;
1352 list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
1356 static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
1357 struct io_kiocb *req)
1359 struct io_uring_cqe *cqe;
1361 if (!(ctx->flags & IORING_SETUP_CQE32)) {
1362 trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
1363 req->cqe.res, req->cqe.flags, 0, 0);
1366 * If we can't get a cq entry, userspace overflowed the
1367 * submission (by quite a lot). Increment the overflow count in
1370 cqe = io_get_cqe(ctx);
1372 memcpy(cqe, &req->cqe, sizeof(*cqe));
1376 return io_cqring_event_overflow(ctx, req->cqe.user_data,
1377 req->cqe.res, req->cqe.flags,
1380 u64 extra1 = 0, extra2 = 0;
1382 if (req->flags & REQ_F_CQE32_INIT) {
1383 extra1 = req->extra1;
1384 extra2 = req->extra2;
1387 trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
1388 req->cqe.res, req->cqe.flags, extra1, extra2);
1391 * If we can't get a cq entry, userspace overflowed the
1392 * submission (by quite a lot). Increment the overflow count in
1395 cqe = io_get_cqe(ctx);
1397 memcpy(cqe, &req->cqe, sizeof(struct io_uring_cqe));
1398 WRITE_ONCE(cqe->big_cqe[0], extra1);
1399 WRITE_ONCE(cqe->big_cqe[1], extra2);
1403 return io_cqring_event_overflow(ctx, req->cqe.user_data,
1404 req->cqe.res, req->cqe.flags,
1409 bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
1412 struct io_uring_cqe *cqe;
1415 trace_io_uring_complete(ctx, NULL, user_data, res, cflags, 0, 0);
1418 * If we can't get a cq entry, userspace overflowed the
1419 * submission (by quite a lot). Increment the overflow count in
1422 cqe = io_get_cqe(ctx);
1424 WRITE_ONCE(cqe->user_data, user_data);
1425 WRITE_ONCE(cqe->res, res);
1426 WRITE_ONCE(cqe->flags, cflags);
1428 if (ctx->flags & IORING_SETUP_CQE32) {
1429 WRITE_ONCE(cqe->big_cqe[0], 0);
1430 WRITE_ONCE(cqe->big_cqe[1], 0);
1434 return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
1437 static void __io_req_complete_put(struct io_kiocb *req)
1440 * If we're the last reference to this request, add to our locked
1443 if (req_ref_put_and_test(req)) {
1444 struct io_ring_ctx *ctx = req->ctx;
1446 if (req->flags & IO_REQ_LINK_FLAGS) {
1447 if (req->flags & IO_DISARM_MASK)
1448 io_disarm_next(req);
1450 io_req_task_queue(req->link);
1454 io_req_put_rsrc(req);
1456 * Selected buffer deallocation in io_clean_op() assumes that
1457 * we don't hold ->completion_lock. Clean them here to avoid
1460 io_put_kbuf_comp(req);
1461 io_dismantle_req(req);
1462 io_put_task(req->task, 1);
1463 wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
1464 ctx->locked_free_nr++;
1468 void __io_req_complete_post(struct io_kiocb *req)
1470 if (!(req->flags & REQ_F_CQE_SKIP))
1471 __io_fill_cqe_req(req->ctx, req);
1472 __io_req_complete_put(req);
1475 void io_req_complete_post(struct io_kiocb *req)
1477 struct io_ring_ctx *ctx = req->ctx;
1479 spin_lock(&ctx->completion_lock);
1480 __io_req_complete_post(req);
1481 io_commit_cqring(ctx);
1482 spin_unlock(&ctx->completion_lock);
1483 io_cqring_ev_posted(ctx);
1486 inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags)
1488 if (issue_flags & IO_URING_F_COMPLETE_DEFER)
1489 req->flags |= REQ_F_COMPLETE_INLINE;
1491 io_req_complete_post(req);
1494 static void io_req_complete_failed(struct io_kiocb *req, s32 res)
1497 io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
1498 io_req_complete_post(req);
1502 * Don't initialise the fields below on every allocation, but do that in
1503 * advance and keep them valid across allocations.
1505 static void io_preinit_req(struct io_kiocb *req, struct io_ring_ctx *ctx)
1509 req->async_data = NULL;
1510 /* not necessary, but safer to zero */
1514 static void io_flush_cached_locked_reqs(struct io_ring_ctx *ctx,
1515 struct io_submit_state *state)
1517 spin_lock(&ctx->completion_lock);
1518 wq_list_splice(&ctx->locked_free_list, &state->free_list);
1519 ctx->locked_free_nr = 0;
1520 spin_unlock(&ctx->completion_lock);
1523 static inline bool io_req_cache_empty(struct io_ring_ctx *ctx)
1525 return !ctx->submit_state.free_list.next;
1529 * A request might get retired back into the request caches even before opcode
1530 * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
1531 * Because of that, io_alloc_req() should be called only under ->uring_lock
1532 * and with extra caution to not get a request that is still worked on.
1534 static __cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
1535 __must_hold(&ctx->uring_lock)
1537 gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
1538 void *reqs[IO_REQ_ALLOC_BATCH];
1542 * If we have more than a batch's worth of requests in our IRQ side
1543 * locked cache, grab the lock and move them over to our submission
1546 if (data_race(ctx->locked_free_nr) > IO_COMPL_BATCH) {
1547 io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
1548 if (!io_req_cache_empty(ctx))
1552 ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);
1555 * Bulk alloc is all-or-nothing. If we fail to get a batch,
1556 * retry single alloc to be on the safe side.
1558 if (unlikely(ret <= 0)) {
1559 reqs[0] = kmem_cache_alloc(req_cachep, gfp);
1565 percpu_ref_get_many(&ctx->refs, ret);
1566 for (i = 0; i < ret; i++) {
1567 struct io_kiocb *req = reqs[i];
1569 io_preinit_req(req, ctx);
1570 io_req_add_to_cache(req, ctx);
1575 static inline bool io_alloc_req_refill(struct io_ring_ctx *ctx)
1577 if (unlikely(io_req_cache_empty(ctx)))
1578 return __io_alloc_req_refill(ctx);
1582 static inline struct io_kiocb *io_alloc_req(struct io_ring_ctx *ctx)
1584 struct io_wq_work_node *node;
1586 node = wq_stack_extract(&ctx->submit_state.free_list);
1587 return container_of(node, struct io_kiocb, comp_list);
1590 static inline void io_dismantle_req(struct io_kiocb *req)
1592 unsigned int flags = req->flags;
1594 if (unlikely(flags & IO_REQ_CLEAN_FLAGS))
1596 if (!(flags & REQ_F_FIXED_FILE))
1597 io_put_file(req->file);
1600 __cold void io_free_req(struct io_kiocb *req)
1602 struct io_ring_ctx *ctx = req->ctx;
1604 io_req_put_rsrc(req);
1605 io_dismantle_req(req);
1606 io_put_task(req->task, 1);
1608 spin_lock(&ctx->completion_lock);
1609 wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
1610 ctx->locked_free_nr++;
1611 spin_unlock(&ctx->completion_lock);
1614 static void __io_req_find_next_prep(struct io_kiocb *req)
1616 struct io_ring_ctx *ctx = req->ctx;
1619 spin_lock(&ctx->completion_lock);
1620 posted = io_disarm_next(req);
1621 io_commit_cqring(ctx);
1622 spin_unlock(&ctx->completion_lock);
1624 io_cqring_ev_posted(ctx);
1627 static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
1629 struct io_kiocb *nxt;
1632 * If LINK is set, we have dependent requests in this chain. If we
1633 * didn't fail this request, queue the first one up, moving any other
1634 * dependencies to the next request. In case of failure, fail the rest
1637 if (unlikely(req->flags & IO_DISARM_MASK))
1638 __io_req_find_next_prep(req);
1644 static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
1648 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1649 atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1651 io_submit_flush_completions(ctx);
1652 mutex_unlock(&ctx->uring_lock);
1655 percpu_ref_put(&ctx->refs);
1658 static inline void ctx_commit_and_unlock(struct io_ring_ctx *ctx)
1660 io_commit_cqring(ctx);
1661 spin_unlock(&ctx->completion_lock);
1662 io_cqring_ev_posted(ctx);
1665 static void handle_prev_tw_list(struct io_wq_work_node *node,
1666 struct io_ring_ctx **ctx, bool *uring_locked)
1668 if (*ctx && !*uring_locked)
1669 spin_lock(&(*ctx)->completion_lock);
1672 struct io_wq_work_node *next = node->next;
1673 struct io_kiocb *req = container_of(node, struct io_kiocb,
1676 prefetch(container_of(next, struct io_kiocb, io_task_work.node));
1678 if (req->ctx != *ctx) {
1679 if (unlikely(!*uring_locked && *ctx))
1680 ctx_commit_and_unlock(*ctx);
1682 ctx_flush_and_put(*ctx, uring_locked);
1684 /* if not contended, grab and improve batching */
1685 *uring_locked = mutex_trylock(&(*ctx)->uring_lock);
1686 percpu_ref_get(&(*ctx)->refs);
1687 if (unlikely(!*uring_locked))
1688 spin_lock(&(*ctx)->completion_lock);
1690 if (likely(*uring_locked)) {
1691 req->io_task_work.func(req, uring_locked);
1693 req->cqe.flags = io_put_kbuf_comp(req);
1694 __io_req_complete_post(req);
1699 if (unlikely(!*uring_locked))
1700 ctx_commit_and_unlock(*ctx);
1703 static void handle_tw_list(struct io_wq_work_node *node,
1704 struct io_ring_ctx **ctx, bool *locked)
1707 struct io_wq_work_node *next = node->next;
1708 struct io_kiocb *req = container_of(node, struct io_kiocb,
1711 prefetch(container_of(next, struct io_kiocb, io_task_work.node));
1713 if (req->ctx != *ctx) {
1714 ctx_flush_and_put(*ctx, locked);
1716 /* if not contended, grab and improve batching */
1717 *locked = mutex_trylock(&(*ctx)->uring_lock);
1718 percpu_ref_get(&(*ctx)->refs);
1720 req->io_task_work.func(req, locked);
1725 static void tctx_task_work(struct callback_head *cb)
1727 bool uring_locked = false;
1728 struct io_ring_ctx *ctx = NULL;
1729 struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
1733 struct io_wq_work_node *node1, *node2;
1735 spin_lock_irq(&tctx->task_lock);
1736 node1 = tctx->prio_task_list.first;
1737 node2 = tctx->task_list.first;
1738 INIT_WQ_LIST(&tctx->task_list);
1739 INIT_WQ_LIST(&tctx->prio_task_list);
1740 if (!node2 && !node1)
1741 tctx->task_running = false;
1742 spin_unlock_irq(&tctx->task_lock);
1743 if (!node2 && !node1)
1747 handle_prev_tw_list(node1, &ctx, &uring_locked);
1749 handle_tw_list(node2, &ctx, &uring_locked);
1752 if (data_race(!tctx->task_list.first) &&
1753 data_race(!tctx->prio_task_list.first) && uring_locked)
1754 io_submit_flush_completions(ctx);
1757 ctx_flush_and_put(ctx, &uring_locked);
1759 /* relaxed read is enough as only the task itself sets ->in_idle */
1760 if (unlikely(atomic_read(&tctx->in_idle)))
1761 io_uring_drop_tctx_refs(current);
1764 static void __io_req_task_work_add(struct io_kiocb *req,
1765 struct io_uring_task *tctx,
1766 struct io_wq_work_list *list)
1768 struct io_ring_ctx *ctx = req->ctx;
1769 struct io_wq_work_node *node;
1770 unsigned long flags;
1773 spin_lock_irqsave(&tctx->task_lock, flags);
1774 wq_list_add_tail(&req->io_task_work.node, list);
1775 running = tctx->task_running;
1777 tctx->task_running = true;
1778 spin_unlock_irqrestore(&tctx->task_lock, flags);
1780 /* task_work already pending, we're done */
1784 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1785 atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1787 if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
1790 spin_lock_irqsave(&tctx->task_lock, flags);
1791 tctx->task_running = false;
1792 node = wq_list_merge(&tctx->prio_task_list, &tctx->task_list);
1793 spin_unlock_irqrestore(&tctx->task_lock, flags);
1796 req = container_of(node, struct io_kiocb, io_task_work.node);
1798 if (llist_add(&req->io_task_work.fallback_node,
1799 &req->ctx->fallback_llist))
1800 schedule_delayed_work(&req->ctx->fallback_work, 1);
1804 void io_req_task_work_add(struct io_kiocb *req)
1806 struct io_uring_task *tctx = req->task->io_uring;
1808 __io_req_task_work_add(req, tctx, &tctx->task_list);
1811 static void io_req_task_prio_work_add(struct io_kiocb *req)
1813 struct io_uring_task *tctx = req->task->io_uring;
1815 if (req->ctx->flags & IORING_SETUP_SQPOLL)
1816 __io_req_task_work_add(req, tctx, &tctx->prio_task_list);
1818 __io_req_task_work_add(req, tctx, &tctx->task_list);
1821 static void io_req_tw_post(struct io_kiocb *req, bool *locked)
1823 io_req_complete_post(req);
1826 void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags)
1828 io_req_set_res(req, res, cflags);
1829 req->io_task_work.func = io_req_tw_post;
1830 io_req_task_work_add(req);
1833 static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
1835 /* not needed for normal modes, but SQPOLL depends on it */
1836 io_tw_lock(req->ctx, locked);
1837 io_req_complete_failed(req, req->cqe.res);
1840 static void io_req_task_submit(struct io_kiocb *req, bool *locked)
1842 io_tw_lock(req->ctx, locked);
1843 /* req->task == current here, checking PF_EXITING is safe */
1844 if (likely(!(req->task->flags & PF_EXITING)))
1847 io_req_complete_failed(req, -EFAULT);
1850 void io_req_task_queue_fail(struct io_kiocb *req, int ret)
1852 io_req_set_res(req, ret, 0);
1853 req->io_task_work.func = io_req_task_cancel;
1854 io_req_task_work_add(req);
1857 static void io_req_task_queue(struct io_kiocb *req)
1859 req->io_task_work.func = io_req_task_submit;
1860 io_req_task_work_add(req);
1863 static void io_req_task_queue_reissue(struct io_kiocb *req)
1865 req->io_task_work.func = io_queue_iowq;
1866 io_req_task_work_add(req);
1869 void io_queue_next(struct io_kiocb *req)
1871 struct io_kiocb *nxt = io_req_find_next(req);
1874 io_req_task_queue(nxt);
1877 static void io_free_batch_list(struct io_ring_ctx *ctx,
1878 struct io_wq_work_node *node)
1879 __must_hold(&ctx->uring_lock)
1881 struct task_struct *task = NULL;
1885 struct io_kiocb *req = container_of(node, struct io_kiocb,
1888 if (unlikely(req->flags & IO_REQ_CLEAN_SLOW_FLAGS)) {
1889 if (req->flags & REQ_F_REFCOUNT) {
1890 node = req->comp_list.next;
1891 if (!req_ref_put_and_test(req))
1894 if ((req->flags & REQ_F_POLLED) && req->apoll) {
1895 struct async_poll *apoll = req->apoll;
1897 if (apoll->double_poll)
1898 kfree(apoll->double_poll);
1899 list_add(&apoll->poll.wait.entry,
1901 req->flags &= ~REQ_F_POLLED;
1903 if (req->flags & IO_REQ_LINK_FLAGS)
1905 if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
1908 if (!(req->flags & REQ_F_FIXED_FILE))
1909 io_put_file(req->file);
1911 io_req_put_rsrc_locked(req, ctx);
1913 if (req->task != task) {
1915 io_put_task(task, task_refs);
1920 node = req->comp_list.next;
1921 io_req_add_to_cache(req, ctx);
1925 io_put_task(task, task_refs);
1928 static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
1929 __must_hold(&ctx->uring_lock)
1931 struct io_wq_work_node *node, *prev;
1932 struct io_submit_state *state = &ctx->submit_state;
1934 if (state->flush_cqes) {
1935 spin_lock(&ctx->completion_lock);
1936 wq_list_for_each(node, prev, &state->compl_reqs) {
1937 struct io_kiocb *req = container_of(node, struct io_kiocb,
1940 if (!(req->flags & REQ_F_CQE_SKIP))
1941 __io_fill_cqe_req(ctx, req);
1944 io_commit_cqring(ctx);
1945 spin_unlock(&ctx->completion_lock);
1946 io_cqring_ev_posted(ctx);
1947 state->flush_cqes = false;
1950 io_free_batch_list(ctx, state->compl_reqs.first);
1951 INIT_WQ_LIST(&state->compl_reqs);
1955 * Drop reference to request, return next in chain (if there is one) if this
1956 * was the last reference to this request.
1958 static inline struct io_kiocb *io_put_req_find_next(struct io_kiocb *req)
1960 struct io_kiocb *nxt = NULL;
1962 if (req_ref_put_and_test(req)) {
1963 if (unlikely(req->flags & IO_REQ_LINK_FLAGS))
1964 nxt = io_req_find_next(req);
1970 static unsigned io_cqring_events(struct io_ring_ctx *ctx)
1972 /* See comment at the top of this file */
1974 return __io_cqring_events(ctx);
1977 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1979 struct io_rings *rings = ctx->rings;
1981 /* make sure SQ entry isn't read before tail */
1982 return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1985 static inline bool io_run_task_work(void)
1987 if (test_thread_flag(TIF_NOTIFY_SIGNAL) || task_work_pending(current)) {
1988 __set_current_state(TASK_RUNNING);
1989 clear_notify_signal();
1990 if (task_work_pending(current))
1998 static int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
2000 struct io_wq_work_node *pos, *start, *prev;
2001 unsigned int poll_flags = BLK_POLL_NOSLEEP;
2002 DEFINE_IO_COMP_BATCH(iob);
2006 * Only spin for completions if we don't have multiple devices hanging
2007 * off our complete list.
2009 if (ctx->poll_multi_queue || force_nonspin)
2010 poll_flags |= BLK_POLL_ONESHOT;
2012 wq_list_for_each(pos, start, &ctx->iopoll_list) {
2013 struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
2014 struct io_rw *rw = io_kiocb_to_cmd(req);
2018 * Move completed and retryable entries to our local lists.
2019 * If we find a request that requires polling, break out
2020 * and complete those lists first, if we have entries there.
2022 if (READ_ONCE(req->iopoll_completed))
2025 ret = rw->kiocb.ki_filp->f_op->iopoll(&rw->kiocb, &iob, poll_flags);
2026 if (unlikely(ret < 0))
2029 poll_flags |= BLK_POLL_ONESHOT;
2031 /* iopoll may have completed current req */
2032 if (!rq_list_empty(iob.req_list) ||
2033 READ_ONCE(req->iopoll_completed))
2037 if (!rq_list_empty(iob.req_list))
2043 wq_list_for_each_resume(pos, prev) {
2044 struct io_kiocb *req = container_of(pos, struct io_kiocb, comp_list);
2046 /* order with io_complete_rw_iopoll(), e.g. ->result updates */
2047 if (!smp_load_acquire(&req->iopoll_completed))
2050 if (unlikely(req->flags & REQ_F_CQE_SKIP))
2053 req->cqe.flags = io_put_kbuf(req, 0);
2054 __io_fill_cqe_req(req->ctx, req);
2057 if (unlikely(!nr_events))
2060 io_commit_cqring(ctx);
2061 io_cqring_ev_posted_iopoll(ctx);
2062 pos = start ? start->next : ctx->iopoll_list.first;
2063 wq_list_cut(&ctx->iopoll_list, prev, start);
2064 io_free_batch_list(ctx, pos);
2069 * We can't just wait for polled events to come to us, we have to actively
2070 * find and complete them.
2072 static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
2074 if (!(ctx->flags & IORING_SETUP_IOPOLL))
2077 mutex_lock(&ctx->uring_lock);
2078 while (!wq_list_empty(&ctx->iopoll_list)) {
2079 /* let it sleep and repeat later if can't complete a request */
2080 if (io_do_iopoll(ctx, true) == 0)
2083 * Ensure we allow local-to-the-cpu processing to take place,
2084 * in this case we need to ensure that we reap all events.
2085 * Also let task_work, etc. to progress by releasing the mutex
2087 if (need_resched()) {
2088 mutex_unlock(&ctx->uring_lock);
2090 mutex_lock(&ctx->uring_lock);
2093 mutex_unlock(&ctx->uring_lock);
2096 static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
2098 unsigned int nr_events = 0;
2100 unsigned long check_cq;
2103 * Don't enter poll loop if we already have events pending.
2104 * If we do, we can potentially be spinning for commands that
2105 * already triggered a CQE (eg in error).
2107 check_cq = READ_ONCE(ctx->check_cq);
2108 if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
2109 __io_cqring_overflow_flush(ctx, false);
2110 if (io_cqring_events(ctx))
2114 * Similarly do not spin if we have not informed the user of any
2117 if (unlikely(check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)))
2122 * If a submit got punted to a workqueue, we can have the
2123 * application entering polling for a command before it gets
2124 * issued. That app will hold the uring_lock for the duration
2125 * of the poll right here, so we need to take a breather every
2126 * now and then to ensure that the issue has a chance to add
2127 * the poll to the issued list. Otherwise we can spin here
2128 * forever, while the workqueue is stuck trying to acquire the
2131 if (wq_list_empty(&ctx->iopoll_list)) {
2132 u32 tail = ctx->cached_cq_tail;
2134 mutex_unlock(&ctx->uring_lock);
2136 mutex_lock(&ctx->uring_lock);
2138 /* some requests don't go through iopoll_list */
2139 if (tail != ctx->cached_cq_tail ||
2140 wq_list_empty(&ctx->iopoll_list))
2143 ret = io_do_iopoll(ctx, !min);
2148 } while (nr_events < min && !need_resched());
2153 static void kiocb_end_write(struct io_kiocb *req)
2156 * Tell lockdep we inherited freeze protection from submission
2159 if (req->flags & REQ_F_ISREG) {
2160 struct super_block *sb = file_inode(req->file)->i_sb;
2162 __sb_writers_acquired(sb, SB_FREEZE_WRITE);
2168 static bool io_resubmit_prep(struct io_kiocb *req)
2170 struct io_async_rw *io = req->async_data;
2172 if (!req_has_async_data(req))
2173 return !io_req_prep_async(req);
2174 iov_iter_restore(&io->s.iter, &io->s.iter_state);
2178 static bool io_rw_should_reissue(struct io_kiocb *req)
2180 umode_t mode = file_inode(req->file)->i_mode;
2181 struct io_ring_ctx *ctx = req->ctx;
2183 if (!S_ISBLK(mode) && !S_ISREG(mode))
2185 if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
2186 !(ctx->flags & IORING_SETUP_IOPOLL)))
2189 * If ref is dying, we might be running poll reap from the exit work.
2190 * Don't attempt to reissue from that path, just let it fail with
2193 if (percpu_ref_is_dying(&ctx->refs))
2196 * Play it safe and assume not safe to re-import and reissue if we're
2197 * not in the original thread group (or in task context).
2199 if (!same_thread_group(req->task, current) || !in_task())
2204 static bool io_resubmit_prep(struct io_kiocb *req)
2208 static bool io_rw_should_reissue(struct io_kiocb *req)
2214 static bool __io_complete_rw_common(struct io_kiocb *req, long res)
2216 struct io_rw *rw = io_kiocb_to_cmd(req);
2218 if (rw->kiocb.ki_flags & IOCB_WRITE) {
2219 kiocb_end_write(req);
2220 fsnotify_modify(req->file);
2222 fsnotify_access(req->file);
2224 if (unlikely(res != req->cqe.res)) {
2225 if ((res == -EAGAIN || res == -EOPNOTSUPP) &&
2226 io_rw_should_reissue(req)) {
2227 req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
2236 inline void io_req_task_complete(struct io_kiocb *req, bool *locked)
2239 req->cqe.flags |= io_put_kbuf(req, 0);
2240 req->flags |= REQ_F_COMPLETE_INLINE;
2241 io_req_add_compl_list(req);
2243 req->cqe.flags |= io_put_kbuf(req, IO_URING_F_UNLOCKED);
2244 io_req_complete_post(req);
2248 static void __io_complete_rw(struct io_kiocb *req, long res,
2249 unsigned int issue_flags)
2251 if (__io_complete_rw_common(req, res))
2253 io_req_set_res(req, req->cqe.res, io_put_kbuf(req, issue_flags));
2254 __io_req_complete(req, issue_flags);
2257 static void io_complete_rw(struct kiocb *kiocb, long res)
2259 struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
2260 struct io_kiocb *req = cmd_to_io_kiocb(rw);
2262 if (__io_complete_rw_common(req, res))
2264 io_req_set_res(req, res, 0);
2265 req->io_task_work.func = io_req_task_complete;
2266 io_req_task_prio_work_add(req);
2269 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
2271 struct io_rw *rw = container_of(kiocb, struct io_rw, kiocb);
2272 struct io_kiocb *req = cmd_to_io_kiocb(rw);
2274 if (kiocb->ki_flags & IOCB_WRITE)
2275 kiocb_end_write(req);
2276 if (unlikely(res != req->cqe.res)) {
2277 if (res == -EAGAIN && io_rw_should_reissue(req)) {
2278 req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
2284 /* order with io_iopoll_complete() checking ->iopoll_completed */
2285 smp_store_release(&req->iopoll_completed, 1);
2289 * After the iocb has been issued, it's safe to be found on the poll list.
2290 * Adding the kiocb to the list AFTER submission ensures that we don't
2291 * find it from a io_do_iopoll() thread before the issuer is done
2292 * accessing the kiocb cookie.
2294 static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
2296 struct io_ring_ctx *ctx = req->ctx;
2297 const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
2299 /* workqueue context doesn't hold uring_lock, grab it now */
2300 if (unlikely(needs_lock))
2301 mutex_lock(&ctx->uring_lock);
2304 * Track whether we have multiple files in our lists. This will impact
2305 * how we do polling eventually, not spinning if we're on potentially
2306 * different devices.
2308 if (wq_list_empty(&ctx->iopoll_list)) {
2309 ctx->poll_multi_queue = false;
2310 } else if (!ctx->poll_multi_queue) {
2311 struct io_kiocb *list_req;
2313 list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
2315 if (list_req->file != req->file)
2316 ctx->poll_multi_queue = true;
2320 * For fast devices, IO may have already completed. If it has, add
2321 * it to the front so we find it first.
2323 if (READ_ONCE(req->iopoll_completed))
2324 wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
2326 wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
2328 if (unlikely(needs_lock)) {
2330 * If IORING_SETUP_SQPOLL is enabled, sqes are either handle
2331 * in sq thread task context or in io worker task context. If
2332 * current task context is sq thread, we don't need to check
2333 * whether should wake up sq thread.
2335 if ((ctx->flags & IORING_SETUP_SQPOLL) &&
2336 wq_has_sleeper(&ctx->sq_data->wait))
2337 wake_up(&ctx->sq_data->wait);
2339 mutex_unlock(&ctx->uring_lock);
2343 static bool io_bdev_nowait(struct block_device *bdev)
2345 return !bdev || blk_queue_nowait(bdev_get_queue(bdev));
2349 * If we tracked the file through the SCM inflight mechanism, we could support
2350 * any file. For now, just ensure that anything potentially problematic is done
2353 static bool __io_file_supports_nowait(struct file *file, umode_t mode)
2355 if (S_ISBLK(mode)) {
2356 if (IS_ENABLED(CONFIG_BLOCK) &&
2357 io_bdev_nowait(I_BDEV(file->f_mapping->host)))
2363 if (S_ISREG(mode)) {
2364 if (IS_ENABLED(CONFIG_BLOCK) &&
2365 io_bdev_nowait(file->f_inode->i_sb->s_bdev) &&
2366 file->f_op != &io_uring_fops)
2371 /* any ->read/write should understand O_NONBLOCK */
2372 if (file->f_flags & O_NONBLOCK)
2374 return file->f_mode & FMODE_NOWAIT;
2378 * If we tracked the file through the SCM inflight mechanism, we could support
2379 * any file. For now, just ensure that anything potentially problematic is done
2382 static unsigned int io_file_get_flags(struct file *file)
2384 umode_t mode = file_inode(file)->i_mode;
2385 unsigned int res = 0;
2389 if (__io_file_supports_nowait(file, mode))
2391 if (io_file_need_scm(file))
2396 static inline bool io_file_supports_nowait(struct io_kiocb *req)
2398 return req->flags & REQ_F_SUPPORT_NOWAIT;
2401 static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2403 struct io_rw *rw = io_kiocb_to_cmd(req);
2407 rw->kiocb.ki_pos = READ_ONCE(sqe->off);
2408 /* used for fixed read/write too - just read unconditionally */
2409 req->buf_index = READ_ONCE(sqe->buf_index);
2411 if (req->opcode == IORING_OP_READ_FIXED ||
2412 req->opcode == IORING_OP_WRITE_FIXED) {
2413 struct io_ring_ctx *ctx = req->ctx;
2416 if (unlikely(req->buf_index >= ctx->nr_user_bufs))
2418 index = array_index_nospec(req->buf_index, ctx->nr_user_bufs);
2419 req->imu = ctx->user_bufs[index];
2420 io_req_set_rsrc_node(req, ctx, 0);
2423 ioprio = READ_ONCE(sqe->ioprio);
2425 ret = ioprio_check_cap(ioprio);
2429 rw->kiocb.ki_ioprio = ioprio;
2431 rw->kiocb.ki_ioprio = get_current_ioprio();
2434 rw->addr = READ_ONCE(sqe->addr);
2435 rw->len = READ_ONCE(sqe->len);
2436 rw->flags = READ_ONCE(sqe->rw_flags);
2440 static void io_readv_writev_cleanup(struct io_kiocb *req)
2442 struct io_async_rw *io = req->async_data;
2444 kfree(io->free_iovec);
2447 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
2453 case -ERESTARTNOINTR:
2454 case -ERESTARTNOHAND:
2455 case -ERESTART_RESTARTBLOCK:
2457 * We can't just restart the syscall, since previously
2458 * submitted sqes may already be in progress. Just fail this
2464 kiocb->ki_complete(kiocb, ret);
2468 static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req)
2470 struct io_rw *rw = io_kiocb_to_cmd(req);
2472 if (rw->kiocb.ki_pos != -1)
2473 return &rw->kiocb.ki_pos;
2475 if (!(req->file->f_mode & FMODE_STREAM)) {
2476 req->flags |= REQ_F_CUR_POS;
2477 rw->kiocb.ki_pos = req->file->f_pos;
2478 return &rw->kiocb.ki_pos;
2481 rw->kiocb.ki_pos = 0;
2485 static void kiocb_done(struct io_kiocb *req, ssize_t ret,
2486 unsigned int issue_flags)
2488 struct io_async_rw *io = req->async_data;
2489 struct io_rw *rw = io_kiocb_to_cmd(req);
2491 /* add previously done IO, if any */
2492 if (req_has_async_data(req) && io->bytes_done > 0) {
2494 ret = io->bytes_done;
2496 ret += io->bytes_done;
2499 if (req->flags & REQ_F_CUR_POS)
2500 req->file->f_pos = rw->kiocb.ki_pos;
2501 if (ret >= 0 && (rw->kiocb.ki_complete == io_complete_rw))
2502 __io_complete_rw(req, ret, issue_flags);
2504 io_rw_done(&rw->kiocb, ret);
2506 if (req->flags & REQ_F_REISSUE) {
2507 req->flags &= ~REQ_F_REISSUE;
2508 if (io_resubmit_prep(req))
2509 io_req_task_queue_reissue(req);
2511 io_req_task_queue_fail(req, ret);
2515 static int __io_import_fixed(struct io_kiocb *req, int ddir,
2516 struct iov_iter *iter, struct io_mapped_ubuf *imu)
2518 struct io_rw *rw = io_kiocb_to_cmd(req);
2519 size_t len = rw->len;
2520 u64 buf_end, buf_addr = rw->addr;
2523 if (unlikely(check_add_overflow(buf_addr, (u64)len, &buf_end)))
2525 /* not inside the mapped region */
2526 if (unlikely(buf_addr < imu->ubuf || buf_end > imu->ubuf_end))
2530 * May not be a start of buffer, set size appropriately
2531 * and advance us to the beginning.
2533 offset = buf_addr - imu->ubuf;
2534 iov_iter_bvec(iter, ddir, imu->bvec, imu->nr_bvecs, offset + len);
2538 * Don't use iov_iter_advance() here, as it's really slow for
2539 * using the latter parts of a big fixed buffer - it iterates
2540 * over each segment manually. We can cheat a bit here, because
2543 * 1) it's a BVEC iter, we set it up
2544 * 2) all bvecs are PAGE_SIZE in size, except potentially the
2545 * first and last bvec
2547 * So just find our index, and adjust the iterator afterwards.
2548 * If the offset is within the first bvec (or the whole first
2549 * bvec, just use iov_iter_advance(). This makes it easier
2550 * since we can just skip the first segment, which may not
2551 * be PAGE_SIZE aligned.
2553 const struct bio_vec *bvec = imu->bvec;
2555 if (offset <= bvec->bv_len) {
2556 iov_iter_advance(iter, offset);
2558 unsigned long seg_skip;
2560 /* skip first vec */
2561 offset -= bvec->bv_len;
2562 seg_skip = 1 + (offset >> PAGE_SHIFT);
2564 iter->bvec = bvec + seg_skip;
2565 iter->nr_segs -= seg_skip;
2566 iter->count -= bvec->bv_len + offset;
2567 iter->iov_offset = offset & ~PAGE_MASK;
2574 static int io_import_fixed(struct io_kiocb *req, int rw, struct iov_iter *iter,
2575 unsigned int issue_flags)
2577 if (WARN_ON_ONCE(!req->imu))
2579 return __io_import_fixed(req, rw, iter, req->imu);
2582 static int io_buffer_add_list(struct io_ring_ctx *ctx,
2583 struct io_buffer_list *bl, unsigned int bgid)
2586 if (bgid < BGID_ARRAY)
2589 return xa_err(xa_store(&ctx->io_bl_xa, bgid, bl, GFP_KERNEL));
2592 static void __user *io_provided_buffer_select(struct io_kiocb *req, size_t *len,
2593 struct io_buffer_list *bl)
2595 if (!list_empty(&bl->buf_list)) {
2596 struct io_buffer *kbuf;
2598 kbuf = list_first_entry(&bl->buf_list, struct io_buffer, list);
2599 list_del(&kbuf->list);
2600 if (*len > kbuf->len)
2602 req->flags |= REQ_F_BUFFER_SELECTED;
2604 req->buf_index = kbuf->bid;
2605 return u64_to_user_ptr(kbuf->addr);
2610 static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len,
2611 struct io_buffer_list *bl,
2612 unsigned int issue_flags)
2614 struct io_uring_buf_ring *br = bl->buf_ring;
2615 struct io_uring_buf *buf;
2616 __u16 head = bl->head;
2618 if (unlikely(smp_load_acquire(&br->tail) == head))
2622 if (head < IO_BUFFER_LIST_BUF_PER_PAGE) {
2623 buf = &br->bufs[head];
2625 int off = head & (IO_BUFFER_LIST_BUF_PER_PAGE - 1);
2626 int index = head / IO_BUFFER_LIST_BUF_PER_PAGE;
2627 buf = page_address(bl->buf_pages[index]);
2630 if (*len > buf->len)
2632 req->flags |= REQ_F_BUFFER_RING;
2634 req->buf_index = buf->bid;
2636 if (issue_flags & IO_URING_F_UNLOCKED || !file_can_poll(req->file)) {
2638 * If we came in unlocked, we have no choice but to consume the
2639 * buffer here. This does mean it'll be pinned until the IO
2640 * completes. But coming in unlocked means we're in io-wq
2641 * context, hence there should be no further retry. For the
2642 * locked case, the caller must ensure to call the commit when
2643 * the transfer completes (or if we get -EAGAIN and must poll
2646 req->buf_list = NULL;
2649 return u64_to_user_ptr(buf->addr);
2652 void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
2653 unsigned int issue_flags)
2655 struct io_ring_ctx *ctx = req->ctx;
2656 struct io_buffer_list *bl;
2657 void __user *ret = NULL;
2659 io_ring_submit_lock(req->ctx, issue_flags);
2661 bl = io_buffer_get_list(ctx, req->buf_index);
2663 if (bl->buf_nr_pages)
2664 ret = io_ring_buffer_select(req, len, bl, issue_flags);
2666 ret = io_provided_buffer_select(req, len, bl);
2668 io_ring_submit_unlock(req->ctx, issue_flags);
2672 #ifdef CONFIG_COMPAT
2673 static ssize_t io_compat_import(struct io_kiocb *req, struct iovec *iov,
2674 unsigned int issue_flags)
2676 struct io_rw *rw = io_kiocb_to_cmd(req);
2677 struct compat_iovec __user *uiov;
2678 compat_ssize_t clen;
2682 uiov = u64_to_user_ptr(rw->addr);
2683 if (!access_ok(uiov, sizeof(*uiov)))
2685 if (__get_user(clen, &uiov->iov_len))
2691 buf = io_buffer_select(req, &len, issue_flags);
2694 rw->addr = (unsigned long) buf;
2695 iov[0].iov_base = buf;
2696 rw->len = iov[0].iov_len = (compat_size_t) len;
2701 static ssize_t __io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
2702 unsigned int issue_flags)
2704 struct io_rw *rw = io_kiocb_to_cmd(req);
2705 struct iovec __user *uiov = u64_to_user_ptr(rw->addr);
2709 if (copy_from_user(iov, uiov, sizeof(*uiov)))
2712 len = iov[0].iov_len;
2715 buf = io_buffer_select(req, &len, issue_flags);
2718 rw->addr = (unsigned long) buf;
2719 iov[0].iov_base = buf;
2720 rw->len = iov[0].iov_len = len;
2724 static ssize_t io_iov_buffer_select(struct io_kiocb *req, struct iovec *iov,
2725 unsigned int issue_flags)
2727 struct io_rw *rw = io_kiocb_to_cmd(req);
2729 if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
2730 iov[0].iov_base = u64_to_user_ptr(rw->addr);
2731 iov[0].iov_len = rw->len;
2737 #ifdef CONFIG_COMPAT
2738 if (req->ctx->compat)
2739 return io_compat_import(req, iov, issue_flags);
2742 return __io_iov_buffer_select(req, iov, issue_flags);
2745 static struct iovec *__io_import_iovec(int ddir, struct io_kiocb *req,
2746 struct io_rw_state *s,
2747 unsigned int issue_flags)
2749 struct io_rw *rw = io_kiocb_to_cmd(req);
2750 struct iov_iter *iter = &s->iter;
2751 u8 opcode = req->opcode;
2752 struct iovec *iovec;
2757 if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
2758 ret = io_import_fixed(req, ddir, iter, issue_flags);
2760 return ERR_PTR(ret);
2764 buf = u64_to_user_ptr(rw->addr);
2767 if (opcode == IORING_OP_READ || opcode == IORING_OP_WRITE) {
2768 if (io_do_buffer_select(req)) {
2769 buf = io_buffer_select(req, &sqe_len, issue_flags);
2771 return ERR_PTR(-ENOBUFS);
2772 rw->addr = (unsigned long) buf;
2776 ret = import_single_range(ddir, buf, sqe_len, s->fast_iov, iter);
2778 return ERR_PTR(ret);
2782 iovec = s->fast_iov;
2783 if (req->flags & REQ_F_BUFFER_SELECT) {
2784 ret = io_iov_buffer_select(req, iovec, issue_flags);
2786 return ERR_PTR(ret);
2787 iov_iter_init(iter, ddir, iovec, 1, iovec->iov_len);
2791 ret = __import_iovec(ddir, buf, sqe_len, UIO_FASTIOV, &iovec, iter,
2793 if (unlikely(ret < 0))
2794 return ERR_PTR(ret);
2798 static inline int io_import_iovec(int rw, struct io_kiocb *req,
2799 struct iovec **iovec, struct io_rw_state *s,
2800 unsigned int issue_flags)
2802 *iovec = __io_import_iovec(rw, req, s, issue_flags);
2803 if (unlikely(IS_ERR(*iovec)))
2804 return PTR_ERR(*iovec);
2806 iov_iter_save_state(&s->iter, &s->iter_state);
2810 static inline loff_t *io_kiocb_ppos(struct kiocb *kiocb)
2812 return (kiocb->ki_filp->f_mode & FMODE_STREAM) ? NULL : &kiocb->ki_pos;
2816 * For files that don't have ->read_iter() and ->write_iter(), handle them
2817 * by looping over ->read() or ->write() manually.
2819 static ssize_t loop_rw_iter(int ddir, struct io_rw *rw, struct iov_iter *iter)
2821 struct kiocb *kiocb = &rw->kiocb;
2822 struct file *file = kiocb->ki_filp;
2827 * Don't support polled IO through this interface, and we can't
2828 * support non-blocking either. For the latter, this just causes
2829 * the kiocb to be handled from an async context.
2831 if (kiocb->ki_flags & IOCB_HIPRI)
2833 if ((kiocb->ki_flags & IOCB_NOWAIT) &&
2834 !(kiocb->ki_filp->f_flags & O_NONBLOCK))
2837 ppos = io_kiocb_ppos(kiocb);
2839 while (iov_iter_count(iter)) {
2843 if (!iov_iter_is_bvec(iter)) {
2844 iovec = iov_iter_iovec(iter);
2846 iovec.iov_base = u64_to_user_ptr(rw->addr);
2847 iovec.iov_len = rw->len;
2851 nr = file->f_op->read(file, iovec.iov_base,
2852 iovec.iov_len, ppos);
2854 nr = file->f_op->write(file, iovec.iov_base,
2855 iovec.iov_len, ppos);
2864 if (!iov_iter_is_bvec(iter)) {
2865 iov_iter_advance(iter, nr);
2872 if (nr != iovec.iov_len)
2879 static void io_req_map_rw(struct io_kiocb *req, const struct iovec *iovec,
2880 const struct iovec *fast_iov, struct iov_iter *iter)
2882 struct io_async_rw *io = req->async_data;
2884 memcpy(&io->s.iter, iter, sizeof(*iter));
2885 io->free_iovec = iovec;
2887 /* can only be fixed buffers, no need to do anything */
2888 if (iov_iter_is_bvec(iter))
2891 unsigned iov_off = 0;
2893 io->s.iter.iov = io->s.fast_iov;
2894 if (iter->iov != fast_iov) {
2895 iov_off = iter->iov - fast_iov;
2896 io->s.iter.iov += iov_off;
2898 if (io->s.fast_iov != fast_iov)
2899 memcpy(io->s.fast_iov + iov_off, fast_iov + iov_off,
2900 sizeof(struct iovec) * iter->nr_segs);
2902 req->flags |= REQ_F_NEED_CLEANUP;
2906 bool io_alloc_async_data(struct io_kiocb *req)
2908 WARN_ON_ONCE(!io_op_defs[req->opcode].async_size);
2909 req->async_data = kmalloc(io_op_defs[req->opcode].async_size, GFP_KERNEL);
2910 if (req->async_data) {
2911 req->flags |= REQ_F_ASYNC_DATA;
2917 static int io_setup_async_rw(struct io_kiocb *req, const struct iovec *iovec,
2918 struct io_rw_state *s, bool force)
2920 if (!force && !io_op_defs[req->opcode].prep_async)
2922 if (!req_has_async_data(req)) {
2923 struct io_async_rw *iorw;
2925 if (io_alloc_async_data(req)) {
2930 io_req_map_rw(req, iovec, s->fast_iov, &s->iter);
2931 iorw = req->async_data;
2932 /* we've copied and mapped the iter, ensure state is saved */
2933 iov_iter_save_state(&iorw->s.iter, &iorw->s.iter_state);
2938 static inline int io_rw_prep_async(struct io_kiocb *req, int rw)
2940 struct io_async_rw *iorw = req->async_data;
2944 /* submission path, ->uring_lock should already be taken */
2945 ret = io_import_iovec(rw, req, &iov, &iorw->s, 0);
2946 if (unlikely(ret < 0))
2949 iorw->bytes_done = 0;
2950 iorw->free_iovec = iov;
2952 req->flags |= REQ_F_NEED_CLEANUP;
2956 static int io_readv_prep_async(struct io_kiocb *req)
2958 return io_rw_prep_async(req, READ);
2961 static int io_writev_prep_async(struct io_kiocb *req)
2963 return io_rw_prep_async(req, WRITE);
2967 * This is our waitqueue callback handler, registered through __folio_lock_async()
2968 * when we initially tried to do the IO with the iocb armed our waitqueue.
2969 * This gets called when the page is unlocked, and we generally expect that to
2970 * happen when the page IO is completed and the page is now uptodate. This will
2971 * queue a task_work based retry of the operation, attempting to copy the data
2972 * again. If the latter fails because the page was NOT uptodate, then we will
2973 * do a thread based blocking retry of the operation. That's the unexpected
2976 static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
2977 int sync, void *arg)
2979 struct wait_page_queue *wpq;
2980 struct io_kiocb *req = wait->private;
2981 struct io_rw *rw = io_kiocb_to_cmd(req);
2982 struct wait_page_key *key = arg;
2984 wpq = container_of(wait, struct wait_page_queue, wait);
2986 if (!wake_page_match(wpq, key))
2989 rw->kiocb.ki_flags &= ~IOCB_WAITQ;
2990 list_del_init(&wait->entry);
2991 io_req_task_queue(req);
2996 * This controls whether a given IO request should be armed for async page
2997 * based retry. If we return false here, the request is handed to the async
2998 * worker threads for retry. If we're doing buffered reads on a regular file,
2999 * we prepare a private wait_page_queue entry and retry the operation. This
3000 * will either succeed because the page is now uptodate and unlocked, or it
3001 * will register a callback when the page is unlocked at IO completion. Through
3002 * that callback, io_uring uses task_work to setup a retry of the operation.
3003 * That retry will attempt the buffered read again. The retry will generally
3004 * succeed, or in rare cases where it fails, we then fall back to using the
3005 * async worker threads for a blocking retry.
3007 static bool io_rw_should_retry(struct io_kiocb *req)
3009 struct io_async_rw *io = req->async_data;
3010 struct wait_page_queue *wait = &io->wpq;
3011 struct io_rw *rw = io_kiocb_to_cmd(req);
3012 struct kiocb *kiocb = &rw->kiocb;
3014 /* never retry for NOWAIT, we just complete with -EAGAIN */
3015 if (req->flags & REQ_F_NOWAIT)
3018 /* Only for buffered IO */
3019 if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_HIPRI))
3023 * just use poll if we can, and don't attempt if the fs doesn't
3024 * support callback based unlocks
3026 if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
3029 wait->wait.func = io_async_buf_func;
3030 wait->wait.private = req;
3031 wait->wait.flags = 0;
3032 INIT_LIST_HEAD(&wait->wait.entry);
3033 kiocb->ki_flags |= IOCB_WAITQ;
3034 kiocb->ki_flags &= ~IOCB_NOWAIT;
3035 kiocb->ki_waitq = wait;
3039 static inline int io_iter_do_read(struct io_rw *rw, struct iov_iter *iter)
3041 struct file *file = rw->kiocb.ki_filp;
3043 if (likely(file->f_op->read_iter))
3044 return call_read_iter(file, &rw->kiocb, iter);
3045 else if (file->f_op->read)
3046 return loop_rw_iter(READ, rw, iter);
3051 static bool need_read_all(struct io_kiocb *req)
3053 return req->flags & REQ_F_ISREG ||
3054 S_ISBLK(file_inode(req->file)->i_mode);
3057 static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
3059 struct io_rw *rw = io_kiocb_to_cmd(req);
3060 struct kiocb *kiocb = &rw->kiocb;
3061 struct io_ring_ctx *ctx = req->ctx;
3062 struct file *file = req->file;
3065 if (unlikely(!file || !(file->f_mode & mode)))
3068 if (!io_req_ffs_set(req))
3069 req->flags |= io_file_get_flags(file) << REQ_F_SUPPORT_NOWAIT_BIT;
3071 kiocb->ki_flags = iocb_flags(file);
3072 ret = kiocb_set_rw_flags(kiocb, rw->flags);
3077 * If the file is marked O_NONBLOCK, still allow retry for it if it
3078 * supports async. Otherwise it's impossible to use O_NONBLOCK files
3079 * reliably. If not, or it IOCB_NOWAIT is set, don't retry.
3081 if ((kiocb->ki_flags & IOCB_NOWAIT) ||
3082 ((file->f_flags & O_NONBLOCK) && !io_file_supports_nowait(req)))
3083 req->flags |= REQ_F_NOWAIT;
3085 if (ctx->flags & IORING_SETUP_IOPOLL) {
3086 if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
3089 kiocb->private = NULL;
3090 kiocb->ki_flags |= IOCB_HIPRI | IOCB_ALLOC_CACHE;
3091 kiocb->ki_complete = io_complete_rw_iopoll;
3092 req->iopoll_completed = 0;
3094 if (kiocb->ki_flags & IOCB_HIPRI)
3096 kiocb->ki_complete = io_complete_rw;
3102 static int io_read(struct io_kiocb *req, unsigned int issue_flags)
3104 struct io_rw *rw = io_kiocb_to_cmd(req);
3105 struct io_rw_state __s, *s = &__s;
3106 struct iovec *iovec;
3107 struct kiocb *kiocb = &rw->kiocb;
3108 bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
3109 struct io_async_rw *io;
3113 if (!req_has_async_data(req)) {
3114 ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
3115 if (unlikely(ret < 0))
3118 io = req->async_data;
3122 * Safe and required to re-import if we're using provided
3123 * buffers, as we dropped the selected one before retry.
3125 if (io_do_buffer_select(req)) {
3126 ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
3127 if (unlikely(ret < 0))
3132 * We come here from an earlier attempt, restore our state to
3133 * match in case it doesn't. It's cheap enough that we don't
3134 * need to make this conditional.
3136 iov_iter_restore(&s->iter, &s->iter_state);
3139 ret = io_rw_init_file(req, FMODE_READ);
3140 if (unlikely(ret)) {
3144 req->cqe.res = iov_iter_count(&s->iter);
3146 if (force_nonblock) {
3147 /* If the file doesn't support async, just async punt */
3148 if (unlikely(!io_file_supports_nowait(req))) {
3149 ret = io_setup_async_rw(req, iovec, s, true);
3150 return ret ?: -EAGAIN;
3152 kiocb->ki_flags |= IOCB_NOWAIT;
3154 /* Ensure we clear previously set non-block flag */
3155 kiocb->ki_flags &= ~IOCB_NOWAIT;
3158 ppos = io_kiocb_update_pos(req);
3160 ret = rw_verify_area(READ, req->file, ppos, req->cqe.res);
3161 if (unlikely(ret)) {
3166 ret = io_iter_do_read(rw, &s->iter);
3168 if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) {
3169 req->flags &= ~REQ_F_REISSUE;
3170 /* if we can poll, just do that */
3171 if (req->opcode == IORING_OP_READ && file_can_poll(req->file))
3173 /* IOPOLL retry should happen for io-wq threads */
3174 if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
3176 /* no retry on NONBLOCK nor RWF_NOWAIT */
3177 if (req->flags & REQ_F_NOWAIT)
3180 } else if (ret == -EIOCBQUEUED) {
3182 } else if (ret == req->cqe.res || ret <= 0 || !force_nonblock ||
3183 (req->flags & REQ_F_NOWAIT) || !need_read_all(req)) {
3184 /* read all, failed, already did sync or don't want to retry */
3189 * Don't depend on the iter state matching what was consumed, or being
3190 * untouched in case of error. Restore it and we'll advance it
3191 * manually if we need to.
3193 iov_iter_restore(&s->iter, &s->iter_state);
3195 ret2 = io_setup_async_rw(req, iovec, s, true);
3200 io = req->async_data;
3203 * Now use our persistent iterator and state, if we aren't already.
3204 * We've restored and mapped the iter to match.
3209 * We end up here because of a partial read, either from
3210 * above or inside this loop. Advance the iter by the bytes
3211 * that were consumed.
3213 iov_iter_advance(&s->iter, ret);
3214 if (!iov_iter_count(&s->iter))
3216 io->bytes_done += ret;
3217 iov_iter_save_state(&s->iter, &s->iter_state);
3219 /* if we can retry, do so with the callbacks armed */
3220 if (!io_rw_should_retry(req)) {
3221 kiocb->ki_flags &= ~IOCB_WAITQ;
3226 * Now retry read with the IOCB_WAITQ parts set in the iocb. If
3227 * we get -EIOCBQUEUED, then we'll get a notification when the
3228 * desired page gets unlocked. We can also get a partial read
3229 * here, and if we do, then just retry at the new offset.
3231 ret = io_iter_do_read(rw, &s->iter);
3232 if (ret == -EIOCBQUEUED)
3233 return IOU_ISSUE_SKIP_COMPLETE;
3234 /* we got some bytes, but not all. retry. */
3235 kiocb->ki_flags &= ~IOCB_WAITQ;
3236 iov_iter_restore(&s->iter, &s->iter_state);
3239 kiocb_done(req, ret, issue_flags);
3241 /* it's faster to check here then delegate to kfree */
3244 return IOU_ISSUE_SKIP_COMPLETE;
3247 static int io_write(struct io_kiocb *req, unsigned int issue_flags)
3249 struct io_rw *rw = io_kiocb_to_cmd(req);
3250 struct io_rw_state __s, *s = &__s;
3251 struct iovec *iovec;
3252 struct kiocb *kiocb = &rw->kiocb;
3253 bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
3257 if (!req_has_async_data(req)) {
3258 ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags);
3259 if (unlikely(ret < 0))
3262 struct io_async_rw *io = req->async_data;
3265 iov_iter_restore(&s->iter, &s->iter_state);
3268 ret = io_rw_init_file(req, FMODE_WRITE);
3269 if (unlikely(ret)) {
3273 req->cqe.res = iov_iter_count(&s->iter);
3275 if (force_nonblock) {
3276 /* If the file doesn't support async, just async punt */
3277 if (unlikely(!io_file_supports_nowait(req)))
3280 /* file path doesn't support NOWAIT for non-direct_IO */
3281 if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
3282 (req->flags & REQ_F_ISREG))
3285 kiocb->ki_flags |= IOCB_NOWAIT;
3287 /* Ensure we clear previously set non-block flag */
3288 kiocb->ki_flags &= ~IOCB_NOWAIT;
3291 ppos = io_kiocb_update_pos(req);
3293 ret = rw_verify_area(WRITE, req->file, ppos, req->cqe.res);
3298 * Open-code file_start_write here to grab freeze protection,
3299 * which will be released by another thread in
3300 * io_complete_rw(). Fool lockdep by telling it the lock got
3301 * released so that it doesn't complain about the held lock when
3302 * we return to userspace.
3304 if (req->flags & REQ_F_ISREG) {
3305 sb_start_write(file_inode(req->file)->i_sb);
3306 __sb_writers_release(file_inode(req->file)->i_sb,
3309 kiocb->ki_flags |= IOCB_WRITE;
3311 if (likely(req->file->f_op->write_iter))
3312 ret2 = call_write_iter(req->file, kiocb, &s->iter);
3313 else if (req->file->f_op->write)
3314 ret2 = loop_rw_iter(WRITE, rw, &s->iter);
3318 if (req->flags & REQ_F_REISSUE) {
3319 req->flags &= ~REQ_F_REISSUE;
3324 * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
3325 * retry them without IOCB_NOWAIT.
3327 if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
3329 /* no retry on NONBLOCK nor RWF_NOWAIT */
3330 if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
3332 if (!force_nonblock || ret2 != -EAGAIN) {
3333 /* IOPOLL retry should happen for io-wq threads */
3334 if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
3337 kiocb_done(req, ret2, issue_flags);
3338 ret = IOU_ISSUE_SKIP_COMPLETE;
3341 iov_iter_restore(&s->iter, &s->iter_state);
3342 ret = io_setup_async_rw(req, iovec, s, false);
3343 return ret ?: -EAGAIN;
3346 /* it's reportedly faster than delegating the null check to kfree() */
3353 * Note when io_fixed_fd_install() returns error value, it will ensure
3354 * fput() is called correspondingly.
3356 int io_fixed_fd_install(struct io_kiocb *req, unsigned int issue_flags,
3357 struct file *file, unsigned int file_slot)
3359 bool alloc_slot = file_slot == IORING_FILE_INDEX_ALLOC;
3360 struct io_ring_ctx *ctx = req->ctx;
3363 io_ring_submit_lock(ctx, issue_flags);
3366 ret = io_file_bitmap_get(ctx);
3367 if (unlikely(ret < 0))
3374 ret = io_install_fixed_file(req, file, issue_flags, file_slot);
3375 if (!ret && alloc_slot)
3378 io_ring_submit_unlock(ctx, issue_flags);
3379 if (unlikely(ret < 0))
3384 static int io_remove_buffers_prep(struct io_kiocb *req,
3385 const struct io_uring_sqe *sqe)
3387 struct io_provide_buf *p = io_kiocb_to_cmd(req);
3390 if (sqe->rw_flags || sqe->addr || sqe->len || sqe->off ||
3394 tmp = READ_ONCE(sqe->fd);
3395 if (!tmp || tmp > USHRT_MAX)
3398 memset(p, 0, sizeof(*p));
3400 p->bgid = READ_ONCE(sqe->buf_group);
3404 static int __io_remove_buffers(struct io_ring_ctx *ctx,
3405 struct io_buffer_list *bl, unsigned nbufs)
3409 /* shouldn't happen */
3413 if (bl->buf_nr_pages) {
3416 i = bl->buf_ring->tail - bl->head;
3417 for (j = 0; j < bl->buf_nr_pages; j++)
3418 unpin_user_page(bl->buf_pages[j]);
3419 kvfree(bl->buf_pages);
3420 bl->buf_pages = NULL;
3421 bl->buf_nr_pages = 0;
3422 /* make sure it's seen as empty */
3423 INIT_LIST_HEAD(&bl->buf_list);
3427 /* the head kbuf is the list itself */
3428 while (!list_empty(&bl->buf_list)) {
3429 struct io_buffer *nxt;
3431 nxt = list_first_entry(&bl->buf_list, struct io_buffer, list);
3432 list_del(&nxt->list);
3442 static int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags)
3444 struct io_provide_buf *p = io_kiocb_to_cmd(req);
3445 struct io_ring_ctx *ctx = req->ctx;
3446 struct io_buffer_list *bl;
3449 io_ring_submit_lock(ctx, issue_flags);
3452 bl = io_buffer_get_list(ctx, p->bgid);
3455 /* can't use provide/remove buffers command on mapped buffers */
3456 if (!bl->buf_nr_pages)
3457 ret = __io_remove_buffers(ctx, bl, p->nbufs);
3462 /* complete before unlock, IOPOLL may need the lock */
3463 io_req_set_res(req, ret, 0);
3464 __io_req_complete(req, issue_flags);
3465 io_ring_submit_unlock(ctx, issue_flags);
3466 return IOU_ISSUE_SKIP_COMPLETE;
3469 static int io_provide_buffers_prep(struct io_kiocb *req,
3470 const struct io_uring_sqe *sqe)
3472 unsigned long size, tmp_check;
3473 struct io_provide_buf *p = io_kiocb_to_cmd(req);
3476 if (sqe->rw_flags || sqe->splice_fd_in)
3479 tmp = READ_ONCE(sqe->fd);
3480 if (!tmp || tmp > USHRT_MAX)
3483 p->addr = READ_ONCE(sqe->addr);
3484 p->len = READ_ONCE(sqe->len);
3486 if (check_mul_overflow((unsigned long)p->len, (unsigned long)p->nbufs,
3489 if (check_add_overflow((unsigned long)p->addr, size, &tmp_check))
3492 size = (unsigned long)p->len * p->nbufs;
3493 if (!access_ok(u64_to_user_ptr(p->addr), size))
3496 p->bgid = READ_ONCE(sqe->buf_group);
3497 tmp = READ_ONCE(sqe->off);
3498 if (tmp > USHRT_MAX)
3504 static int io_refill_buffer_cache(struct io_ring_ctx *ctx)
3506 struct io_buffer *buf;
3511 * Completions that don't happen inline (eg not under uring_lock) will
3512 * add to ->io_buffers_comp. If we don't have any free buffers, check
3513 * the completion list and splice those entries first.
3515 if (!list_empty_careful(&ctx->io_buffers_comp)) {
3516 spin_lock(&ctx->completion_lock);
3517 if (!list_empty(&ctx->io_buffers_comp)) {
3518 list_splice_init(&ctx->io_buffers_comp,
3519 &ctx->io_buffers_cache);
3520 spin_unlock(&ctx->completion_lock);
3523 spin_unlock(&ctx->completion_lock);
3527 * No free buffers and no completion entries either. Allocate a new
3528 * page worth of buffer entries and add those to our freelist.
3530 page = alloc_page(GFP_KERNEL_ACCOUNT);
3534 list_add(&page->lru, &ctx->io_buffers_pages);
3536 buf = page_address(page);
3537 bufs_in_page = PAGE_SIZE / sizeof(*buf);
3538 while (bufs_in_page) {
3539 list_add_tail(&buf->list, &ctx->io_buffers_cache);
3547 static int io_add_buffers(struct io_ring_ctx *ctx, struct io_provide_buf *pbuf,
3548 struct io_buffer_list *bl)
3550 struct io_buffer *buf;
3551 u64 addr = pbuf->addr;
3552 int i, bid = pbuf->bid;
3554 for (i = 0; i < pbuf->nbufs; i++) {
3555 if (list_empty(&ctx->io_buffers_cache) &&
3556 io_refill_buffer_cache(ctx))
3558 buf = list_first_entry(&ctx->io_buffers_cache, struct io_buffer,
3560 list_move_tail(&buf->list, &bl->buf_list);
3562 buf->len = min_t(__u32, pbuf->len, MAX_RW_COUNT);
3564 buf->bgid = pbuf->bgid;
3570 return i ? 0 : -ENOMEM;
3573 static __cold int io_init_bl_list(struct io_ring_ctx *ctx)
3577 ctx->io_bl = kcalloc(BGID_ARRAY, sizeof(struct io_buffer_list),
3582 for (i = 0; i < BGID_ARRAY; i++) {
3583 INIT_LIST_HEAD(&ctx->io_bl[i].buf_list);
3584 ctx->io_bl[i].bgid = i;
3590 static int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
3592 struct io_provide_buf *p = io_kiocb_to_cmd(req);
3593 struct io_ring_ctx *ctx = req->ctx;
3594 struct io_buffer_list *bl;
3597 io_ring_submit_lock(ctx, issue_flags);
3599 if (unlikely(p->bgid < BGID_ARRAY && !ctx->io_bl)) {
3600 ret = io_init_bl_list(ctx);
3605 bl = io_buffer_get_list(ctx, p->bgid);
3606 if (unlikely(!bl)) {
3607 bl = kzalloc(sizeof(*bl), GFP_KERNEL);
3612 INIT_LIST_HEAD(&bl->buf_list);
3613 ret = io_buffer_add_list(ctx, bl, p->bgid);
3619 /* can't add buffers via this command for a mapped buffer ring */
3620 if (bl->buf_nr_pages) {
3625 ret = io_add_buffers(ctx, p, bl);
3629 /* complete before unlock, IOPOLL may need the lock */
3630 io_req_set_res(req, ret, 0);
3631 __io_req_complete(req, issue_flags);
3632 io_ring_submit_unlock(ctx, issue_flags);
3633 return IOU_ISSUE_SKIP_COMPLETE;
3636 static __maybe_unused int io_eopnotsupp_prep(struct io_kiocb *kiocb,
3637 const struct io_uring_sqe *sqe)
3642 struct io_poll_table {
3643 struct poll_table_struct pt;
3644 struct io_kiocb *req;
3649 #define IO_POLL_CANCEL_FLAG BIT(31)
3650 #define IO_POLL_REF_MASK GENMASK(30, 0)
3653 * If refs part of ->poll_refs (see IO_POLL_REF_MASK) is 0, it's free. We can
3654 * bump it and acquire ownership. It's disallowed to modify requests while not
3655 * owning it, that prevents from races for enqueueing task_work's and b/w
3656 * arming poll and wakeups.
3658 static inline bool io_poll_get_ownership(struct io_kiocb *req)
3660 return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
3663 static void io_poll_mark_cancelled(struct io_kiocb *req)
3665 atomic_or(IO_POLL_CANCEL_FLAG, &req->poll_refs);
3668 static struct io_poll *io_poll_get_double(struct io_kiocb *req)
3670 /* pure poll stashes this in ->async_data, poll driven retry elsewhere */
3671 if (req->opcode == IORING_OP_POLL_ADD)
3672 return req->async_data;
3673 return req->apoll->double_poll;
3676 static struct io_poll *io_poll_get_single(struct io_kiocb *req)
3678 if (req->opcode == IORING_OP_POLL_ADD)
3679 return io_kiocb_to_cmd(req);
3680 return &req->apoll->poll;
3683 static void io_poll_req_insert(struct io_kiocb *req)
3685 struct io_ring_ctx *ctx = req->ctx;
3686 struct hlist_head *list;
3688 list = &ctx->cancel_hash[hash_long(req->cqe.user_data, ctx->cancel_hash_bits)];
3689 hlist_add_head(&req->hash_node, list);
3692 static void io_init_poll_iocb(struct io_poll *poll, __poll_t events,
3693 wait_queue_func_t wake_func)
3696 #define IO_POLL_UNMASK (EPOLLERR|EPOLLHUP|EPOLLNVAL|EPOLLRDHUP)
3697 /* mask in events that we always want/need */
3698 poll->events = events | IO_POLL_UNMASK;
3699 INIT_LIST_HEAD(&poll->wait.entry);
3700 init_waitqueue_func_entry(&poll->wait, wake_func);
3703 static inline void io_poll_remove_entry(struct io_poll *poll)
3705 struct wait_queue_head *head = smp_load_acquire(&poll->head);
3708 spin_lock_irq(&head->lock);
3709 list_del_init(&poll->wait.entry);
3711 spin_unlock_irq(&head->lock);
3715 static void io_poll_remove_entries(struct io_kiocb *req)
3718 * Nothing to do if neither of those flags are set. Avoid dipping
3719 * into the poll/apoll/double cachelines if we can.
3721 if (!(req->flags & (REQ_F_SINGLE_POLL | REQ_F_DOUBLE_POLL)))
3725 * While we hold the waitqueue lock and the waitqueue is nonempty,
3726 * wake_up_pollfree() will wait for us. However, taking the waitqueue
3727 * lock in the first place can race with the waitqueue being freed.
3729 * We solve this as eventpoll does: by taking advantage of the fact that
3730 * all users of wake_up_pollfree() will RCU-delay the actual free. If
3731 * we enter rcu_read_lock() and see that the pointer to the queue is
3732 * non-NULL, we can then lock it without the memory being freed out from
3735 * Keep holding rcu_read_lock() as long as we hold the queue lock, in
3736 * case the caller deletes the entry from the queue, leaving it empty.
3737 * In that case, only RCU prevents the queue memory from being freed.
3740 if (req->flags & REQ_F_SINGLE_POLL)
3741 io_poll_remove_entry(io_poll_get_single(req));
3742 if (req->flags & REQ_F_DOUBLE_POLL)
3743 io_poll_remove_entry(io_poll_get_double(req));
3747 static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags);
3749 * All poll tw should go through this. Checks for poll events, manages
3750 * references, does rewait, etc.
3752 * Returns a negative error on failure. >0 when no action require, which is
3753 * either spurious wakeup or multishot CQE is served. 0 when it's done with
3754 * the request, then the mask is stored in req->cqe.res.
3756 static int io_poll_check_events(struct io_kiocb *req, bool *locked)
3758 struct io_ring_ctx *ctx = req->ctx;
3761 /* req->task == current here, checking PF_EXITING is safe */
3762 if (unlikely(req->task->flags & PF_EXITING))
3766 v = atomic_read(&req->poll_refs);
3768 /* tw handler should be the owner, and so have some references */
3769 if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK)))
3771 if (v & IO_POLL_CANCEL_FLAG)
3774 if (!req->cqe.res) {
3775 struct poll_table_struct pt = { ._key = req->apoll_events };
3776 req->cqe.res = vfs_poll(req->file, &pt) & req->apoll_events;
3779 if ((unlikely(!req->cqe.res)))
3781 if (req->apoll_events & EPOLLONESHOT)
3784 /* multishot, just fill a CQE and proceed */
3785 if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
3786 __poll_t mask = mangle_poll(req->cqe.res &
3790 spin_lock(&ctx->completion_lock);
3791 filled = io_fill_cqe_aux(ctx, req->cqe.user_data,
3792 mask, IORING_CQE_F_MORE);
3793 io_commit_cqring(ctx);
3794 spin_unlock(&ctx->completion_lock);
3796 io_cqring_ev_posted(ctx);
3802 io_tw_lock(req->ctx, locked);
3803 if (unlikely(req->task->flags & PF_EXITING))
3805 ret = io_issue_sqe(req,
3806 IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
3811 * Release all references, retry if someone tried to restart
3812 * task_work while we were executing it.
3814 } while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs));
3819 static void io_poll_task_func(struct io_kiocb *req, bool *locked)
3821 struct io_ring_ctx *ctx = req->ctx;
3824 ret = io_poll_check_events(req, locked);
3829 struct io_poll *poll = io_kiocb_to_cmd(req);
3831 req->cqe.res = mangle_poll(req->cqe.res & poll->events);
3837 io_poll_remove_entries(req);
3838 spin_lock(&ctx->completion_lock);
3839 hash_del(&req->hash_node);
3841 __io_req_complete_post(req);
3842 io_commit_cqring(ctx);
3843 spin_unlock(&ctx->completion_lock);
3844 io_cqring_ev_posted(ctx);
3847 static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
3849 struct io_ring_ctx *ctx = req->ctx;
3852 ret = io_poll_check_events(req, locked);
3856 io_poll_remove_entries(req);
3857 spin_lock(&ctx->completion_lock);
3858 hash_del(&req->hash_node);
3859 spin_unlock(&ctx->completion_lock);
3862 io_req_task_submit(req, locked);
3864 io_req_complete_failed(req, ret);
3867 static void __io_poll_execute(struct io_kiocb *req, int mask,
3868 __poll_t __maybe_unused events)
3870 io_req_set_res(req, mask, 0);
3872 * This is useful for poll that is armed on behalf of another
3873 * request, and where the wakeup path could be on a different
3874 * CPU. We want to avoid pulling in req->apoll->events for that
3877 if (req->opcode == IORING_OP_POLL_ADD)
3878 req->io_task_work.func = io_poll_task_func;
3880 req->io_task_work.func = io_apoll_task_func;
3882 trace_io_uring_task_add(req->ctx, req, req->cqe.user_data, req->opcode, mask);
3883 io_req_task_work_add(req);
3886 static inline void io_poll_execute(struct io_kiocb *req, int res,
3889 if (io_poll_get_ownership(req))
3890 __io_poll_execute(req, res, events);
3893 static void io_poll_cancel_req(struct io_kiocb *req)
3895 io_poll_mark_cancelled(req);
3896 /* kick tw, which should complete the request */
3897 io_poll_execute(req, 0, 0);
3900 #define wqe_to_req(wait) ((void *)((unsigned long) (wait)->private & ~1))
3901 #define wqe_is_double(wait) ((unsigned long) (wait)->private & 1)
3902 #define IO_ASYNC_POLL_COMMON (EPOLLONESHOT | EPOLLPRI)
3904 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
3907 struct io_kiocb *req = wqe_to_req(wait);
3908 struct io_poll *poll = container_of(wait, struct io_poll, wait);
3909 __poll_t mask = key_to_poll(key);
3911 if (unlikely(mask & POLLFREE)) {
3912 io_poll_mark_cancelled(req);
3913 /* we have to kick tw in case it's not already */
3914 io_poll_execute(req, 0, poll->events);
3917 * If the waitqueue is being freed early but someone is already
3918 * holds ownership over it, we have to tear down the request as
3919 * best we can. That means immediately removing the request from
3920 * its waitqueue and preventing all further accesses to the
3921 * waitqueue via the request.
3923 list_del_init(&poll->wait.entry);
3926 * Careful: this *must* be the last step, since as soon
3927 * as req->head is NULL'ed out, the request can be
3928 * completed and freed, since aio_poll_complete_work()
3929 * will no longer need to take the waitqueue lock.
3931 smp_store_release(&poll->head, NULL);
3935 /* for instances that support it check for an event match first */
3936 if (mask && !(mask & (poll->events & ~IO_ASYNC_POLL_COMMON)))
3939 if (io_poll_get_ownership(req)) {
3940 /* optional, saves extra locking for removal in tw handler */
3941 if (mask && poll->events & EPOLLONESHOT) {
3942 list_del_init(&poll->wait.entry);
3944 if (wqe_is_double(wait))
3945 req->flags &= ~REQ_F_DOUBLE_POLL;
3947 req->flags &= ~REQ_F_SINGLE_POLL;
3949 __io_poll_execute(req, mask, poll->events);
3954 static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt,
3955 struct wait_queue_head *head,
3956 struct io_poll **poll_ptr)
3958 struct io_kiocb *req = pt->req;
3959 unsigned long wqe_private = (unsigned long) req;
3962 * The file being polled uses multiple waitqueues for poll handling
3963 * (e.g. one for read, one for write). Setup a separate io_poll
3966 if (unlikely(pt->nr_entries)) {
3967 struct io_poll *first = poll;
3969 /* double add on the same waitqueue head, ignore */
3970 if (first->head == head)
3972 /* already have a 2nd entry, fail a third attempt */
3974 if ((*poll_ptr)->head == head)
3976 pt->error = -EINVAL;
3980 poll = kmalloc(sizeof(*poll), GFP_ATOMIC);
3982 pt->error = -ENOMEM;
3985 /* mark as double wq entry */
3987 req->flags |= REQ_F_DOUBLE_POLL;
3988 io_init_poll_iocb(poll, first->events, first->wait.func);
3990 if (req->opcode == IORING_OP_POLL_ADD)
3991 req->flags |= REQ_F_ASYNC_DATA;
3994 req->flags |= REQ_F_SINGLE_POLL;
3997 poll->wait.private = (void *) wqe_private;
3999 if (poll->events & EPOLLEXCLUSIVE)
4000 add_wait_queue_exclusive(head, &poll->wait);
4002 add_wait_queue(head, &poll->wait);
4005 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
4006 struct poll_table_struct *p)
4008 struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
4009 struct io_poll *poll = io_kiocb_to_cmd(pt->req);
4011 __io_queue_proc(poll, pt, head,
4012 (struct io_poll **) &pt->req->async_data);
4015 static int __io_arm_poll_handler(struct io_kiocb *req,
4016 struct io_poll *poll,
4017 struct io_poll_table *ipt, __poll_t mask)
4019 struct io_ring_ctx *ctx = req->ctx;
4022 INIT_HLIST_NODE(&req->hash_node);
4023 req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
4024 io_init_poll_iocb(poll, mask, io_poll_wake);
4025 poll->file = req->file;
4027 req->apoll_events = poll->events;
4029 ipt->pt._key = mask;
4032 ipt->nr_entries = 0;
4035 * Take the ownership to delay any tw execution up until we're done
4036 * with poll arming. see io_poll_get_ownership().
4038 atomic_set(&req->poll_refs, 1);
4039 mask = vfs_poll(req->file, &ipt->pt) & poll->events;
4041 if (mask && (poll->events & EPOLLONESHOT)) {
4042 io_poll_remove_entries(req);
4043 /* no one else has access to the req, forget about the ref */
4046 if (!mask && unlikely(ipt->error || !ipt->nr_entries)) {
4047 io_poll_remove_entries(req);
4049 ipt->error = -EINVAL;
4053 spin_lock(&ctx->completion_lock);
4054 io_poll_req_insert(req);
4055 spin_unlock(&ctx->completion_lock);
4058 /* can't multishot if failed, just queue the event we've got */
4059 if (unlikely(ipt->error || !ipt->nr_entries)) {
4060 poll->events |= EPOLLONESHOT;
4061 req->apoll_events |= EPOLLONESHOT;
4064 __io_poll_execute(req, mask, poll->events);
4069 * Release ownership. If someone tried to queue a tw while it was
4070 * locked, kick it off for them.
4072 v = atomic_dec_return(&req->poll_refs);
4073 if (unlikely(v & IO_POLL_REF_MASK))
4074 __io_poll_execute(req, 0, poll->events);
4078 static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
4079 struct poll_table_struct *p)
4081 struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
4082 struct async_poll *apoll = pt->req->apoll;
4084 __io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
4093 static int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags)
4095 const struct io_op_def *def = &io_op_defs[req->opcode];
4096 struct io_ring_ctx *ctx = req->ctx;
4097 struct async_poll *apoll;
4098 struct io_poll_table ipt;
4099 __poll_t mask = POLLPRI | POLLERR;
4102 if (!def->pollin && !def->pollout)
4103 return IO_APOLL_ABORTED;
4104 if (!file_can_poll(req->file))
4105 return IO_APOLL_ABORTED;
4106 if ((req->flags & (REQ_F_POLLED|REQ_F_PARTIAL_IO)) == REQ_F_POLLED)
4107 return IO_APOLL_ABORTED;
4108 if (!(req->flags & REQ_F_APOLL_MULTISHOT))
4109 mask |= EPOLLONESHOT;
4112 mask |= EPOLLIN | EPOLLRDNORM;
4114 /* If reading from MSG_ERRQUEUE using recvmsg, ignore POLLIN */
4115 if (req->flags & REQ_F_CLEAR_POLLIN)
4118 mask |= EPOLLOUT | EPOLLWRNORM;
4120 if (def->poll_exclusive)
4121 mask |= EPOLLEXCLUSIVE;
4122 if (req->flags & REQ_F_POLLED) {
4124 kfree(apoll->double_poll);
4125 } else if (!(issue_flags & IO_URING_F_UNLOCKED) &&
4126 !list_empty(&ctx->apoll_cache)) {
4127 apoll = list_first_entry(&ctx->apoll_cache, struct async_poll,
4129 list_del_init(&apoll->poll.wait.entry);
4131 apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
4132 if (unlikely(!apoll))
4133 return IO_APOLL_ABORTED;
4135 apoll->double_poll = NULL;
4137 req->flags |= REQ_F_POLLED;
4138 ipt.pt._qproc = io_async_queue_proc;
4140 io_kbuf_recycle(req, issue_flags);
4142 ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask);
4143 if (ret || ipt.error)
4144 return ret ? IO_APOLL_READY : IO_APOLL_ABORTED;
4146 trace_io_uring_poll_arm(ctx, req, req->cqe.user_data, req->opcode,
4147 mask, apoll->poll.events);
4152 * Returns true if we found and killed one or more poll requests
4154 static __cold bool io_poll_remove_all(struct io_ring_ctx *ctx,
4155 struct task_struct *tsk, bool cancel_all)
4157 struct hlist_node *tmp;
4158 struct io_kiocb *req;
4162 spin_lock(&ctx->completion_lock);
4163 for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
4164 struct hlist_head *list;
4166 list = &ctx->cancel_hash[i];
4167 hlist_for_each_entry_safe(req, tmp, list, hash_node) {
4168 if (io_match_task_safe(req, tsk, cancel_all)) {
4169 hlist_del_init(&req->hash_node);
4170 io_poll_cancel_req(req);
4175 spin_unlock(&ctx->completion_lock);
4179 static struct io_kiocb *io_poll_find(struct io_ring_ctx *ctx, bool poll_only,
4180 struct io_cancel_data *cd)
4181 __must_hold(&ctx->completion_lock)
4183 struct hlist_head *list;
4184 struct io_kiocb *req;
4186 list = &ctx->cancel_hash[hash_long(cd->data, ctx->cancel_hash_bits)];
4187 hlist_for_each_entry(req, list, hash_node) {
4188 if (cd->data != req->cqe.user_data)
4190 if (poll_only && req->opcode != IORING_OP_POLL_ADD)
4192 if (cd->flags & IORING_ASYNC_CANCEL_ALL) {
4193 if (cd->seq == req->work.cancel_seq)
4195 req->work.cancel_seq = cd->seq;
4202 static struct io_kiocb *io_poll_file_find(struct io_ring_ctx *ctx,
4203 struct io_cancel_data *cd)
4204 __must_hold(&ctx->completion_lock)
4206 struct io_kiocb *req;
4209 for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
4210 struct hlist_head *list;
4212 list = &ctx->cancel_hash[i];
4213 hlist_for_each_entry(req, list, hash_node) {
4214 if (!(cd->flags & IORING_ASYNC_CANCEL_ANY) &&
4215 req->file != cd->file)
4217 if (cd->seq == req->work.cancel_seq)
4219 req->work.cancel_seq = cd->seq;
4226 static bool io_poll_disarm(struct io_kiocb *req)
4227 __must_hold(&ctx->completion_lock)
4229 if (!io_poll_get_ownership(req))
4231 io_poll_remove_entries(req);
4232 hash_del(&req->hash_node);
4236 static int io_poll_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd)
4237 __must_hold(&ctx->completion_lock)
4239 struct io_kiocb *req;
4241 if (cd->flags & (IORING_ASYNC_CANCEL_FD|IORING_ASYNC_CANCEL_ANY))
4242 req = io_poll_file_find(ctx, cd);
4244 req = io_poll_find(ctx, false, cd);
4247 io_poll_cancel_req(req);
4251 static __poll_t io_poll_parse_events(const struct io_uring_sqe *sqe,
4256 events = READ_ONCE(sqe->poll32_events);
4258 events = swahw32(events);
4260 if (!(flags & IORING_POLL_ADD_MULTI))
4261 events |= EPOLLONESHOT;
4262 return demangle_poll(events) | (events & (EPOLLEXCLUSIVE|EPOLLONESHOT));
4265 static int io_poll_remove_prep(struct io_kiocb *req,
4266 const struct io_uring_sqe *sqe)
4268 struct io_poll_update *upd = io_kiocb_to_cmd(req);
4271 if (sqe->buf_index || sqe->splice_fd_in)
4273 flags = READ_ONCE(sqe->len);
4274 if (flags & ~(IORING_POLL_UPDATE_EVENTS | IORING_POLL_UPDATE_USER_DATA |
4275 IORING_POLL_ADD_MULTI))
4277 /* meaningless without update */
4278 if (flags == IORING_POLL_ADD_MULTI)
4281 upd->old_user_data = READ_ONCE(sqe->addr);
4282 upd->update_events = flags & IORING_POLL_UPDATE_EVENTS;
4283 upd->update_user_data = flags & IORING_POLL_UPDATE_USER_DATA;
4285 upd->new_user_data = READ_ONCE(sqe->off);
4286 if (!upd->update_user_data && upd->new_user_data)
4288 if (upd->update_events)
4289 upd->events = io_poll_parse_events(sqe, flags);
4290 else if (sqe->poll32_events)
4296 static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
4298 struct io_poll *poll = io_kiocb_to_cmd(req);
4301 if (sqe->buf_index || sqe->off || sqe->addr)
4303 flags = READ_ONCE(sqe->len);
4304 if (flags & ~IORING_POLL_ADD_MULTI)
4306 if ((flags & IORING_POLL_ADD_MULTI) && (req->flags & REQ_F_CQE_SKIP))
4309 io_req_set_refcount(req);
4310 poll->events = io_poll_parse_events(sqe, flags);
4314 static int io_poll_add(struct io_kiocb *req, unsigned int issue_flags)
4316 struct io_poll *poll = io_kiocb_to_cmd(req);
4317 struct io_poll_table ipt;
4320 ipt.pt._qproc = io_poll_queue_proc;
4322 ret = __io_arm_poll_handler(req, poll, &ipt, poll->events);
4324 io_req_set_res(req, ret, 0);
4332 return IOU_ISSUE_SKIP_COMPLETE;
4335 static int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags)
4337 struct io_poll_update *poll_update = io_kiocb_to_cmd(req);
4338 struct io_cancel_data cd = { .data = poll_update->old_user_data, };
4339 struct io_ring_ctx *ctx = req->ctx;
4340 struct io_kiocb *preq;
4344 spin_lock(&ctx->completion_lock);
4345 preq = io_poll_find(ctx, true, &cd);
4346 if (!preq || !io_poll_disarm(preq)) {
4347 spin_unlock(&ctx->completion_lock);
4348 ret = preq ? -EALREADY : -ENOENT;
4351 spin_unlock(&ctx->completion_lock);
4353 if (poll_update->update_events || poll_update->update_user_data) {
4354 /* only mask one event flags, keep behavior flags */
4355 if (poll_update->update_events) {
4356 struct io_poll *poll = io_kiocb_to_cmd(preq);
4358 poll->events &= ~0xffff;
4359 poll->events |= poll_update->events & 0xffff;
4360 poll->events |= IO_POLL_UNMASK;
4362 if (poll_update->update_user_data)
4363 preq->cqe.user_data = poll_update->new_user_data;
4365 ret2 = io_poll_add(preq, issue_flags);
4366 /* successfully updated, don't complete poll request */
4367 if (!ret2 || ret2 == -EIOCBQUEUED)
4372 io_req_set_res(preq, -ECANCELED, 0);
4373 locked = !(issue_flags & IO_URING_F_UNLOCKED);
4374 io_req_task_complete(preq, &locked);
4380 /* complete update request, we're done with it */
4381 io_req_set_res(req, ret, 0);
4385 static bool io_cancel_cb(struct io_wq_work *work, void *data)
4387 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4388 struct io_cancel_data *cd = data;
4390 if (req->ctx != cd->ctx)
4392 if (cd->flags & IORING_ASYNC_CANCEL_ANY) {
4394 } else if (cd->flags & IORING_ASYNC_CANCEL_FD) {
4395 if (req->file != cd->file)
4398 if (req->cqe.user_data != cd->data)
4401 if (cd->flags & (IORING_ASYNC_CANCEL_ALL|IORING_ASYNC_CANCEL_ANY)) {
4402 if (cd->seq == req->work.cancel_seq)
4404 req->work.cancel_seq = cd->seq;
4409 static int io_async_cancel_one(struct io_uring_task *tctx,
4410 struct io_cancel_data *cd)
4412 enum io_wq_cancel cancel_ret;
4416 if (!tctx || !tctx->io_wq)
4419 all = cd->flags & (IORING_ASYNC_CANCEL_ALL|IORING_ASYNC_CANCEL_ANY);
4420 cancel_ret = io_wq_cancel_cb(tctx->io_wq, io_cancel_cb, cd, all);
4421 switch (cancel_ret) {
4422 case IO_WQ_CANCEL_OK:
4425 case IO_WQ_CANCEL_RUNNING:
4428 case IO_WQ_CANCEL_NOTFOUND:
4436 int io_try_cancel(struct io_kiocb *req, struct io_cancel_data *cd)
4438 struct io_ring_ctx *ctx = req->ctx;
4441 WARN_ON_ONCE(!io_wq_current_is_worker() && req->task != current);
4443 ret = io_async_cancel_one(req->task->io_uring, cd);
4445 * Fall-through even for -EALREADY, as we may have poll armed
4446 * that need unarming.
4451 spin_lock(&ctx->completion_lock);
4452 ret = io_poll_cancel(ctx, cd);
4455 if (!(cd->flags & IORING_ASYNC_CANCEL_FD))
4456 ret = io_timeout_cancel(ctx, cd);
4458 spin_unlock(&ctx->completion_lock);
4462 #define CANCEL_FLAGS (IORING_ASYNC_CANCEL_ALL | IORING_ASYNC_CANCEL_FD | \
4463 IORING_ASYNC_CANCEL_ANY)
4465 static int io_async_cancel_prep(struct io_kiocb *req,
4466 const struct io_uring_sqe *sqe)
4468 struct io_cancel *cancel = io_kiocb_to_cmd(req);
4470 if (unlikely(req->flags & REQ_F_BUFFER_SELECT))
4472 if (sqe->off || sqe->len || sqe->splice_fd_in)
4475 cancel->addr = READ_ONCE(sqe->addr);
4476 cancel->flags = READ_ONCE(sqe->cancel_flags);
4477 if (cancel->flags & ~CANCEL_FLAGS)
4479 if (cancel->flags & IORING_ASYNC_CANCEL_FD) {
4480 if (cancel->flags & IORING_ASYNC_CANCEL_ANY)
4482 cancel->fd = READ_ONCE(sqe->fd);
4488 static int __io_async_cancel(struct io_cancel_data *cd, struct io_kiocb *req,
4489 unsigned int issue_flags)
4491 bool all = cd->flags & (IORING_ASYNC_CANCEL_ALL|IORING_ASYNC_CANCEL_ANY);
4492 struct io_ring_ctx *ctx = cd->ctx;
4493 struct io_tctx_node *node;
4497 ret = io_try_cancel(req, cd);
4505 /* slow path, try all io-wq's */
4506 io_ring_submit_lock(ctx, issue_flags);
4508 list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
4509 struct io_uring_task *tctx = node->task->io_uring;
4511 ret = io_async_cancel_one(tctx, cd);
4512 if (ret != -ENOENT) {
4518 io_ring_submit_unlock(ctx, issue_flags);
4519 return all ? nr : ret;
4522 static int io_async_cancel(struct io_kiocb *req, unsigned int issue_flags)
4524 struct io_cancel *cancel = io_kiocb_to_cmd(req);
4525 struct io_cancel_data cd = {
4527 .data = cancel->addr,
4528 .flags = cancel->flags,
4529 .seq = atomic_inc_return(&req->ctx->cancel_seq),
4533 if (cd.flags & IORING_ASYNC_CANCEL_FD) {
4534 if (req->flags & REQ_F_FIXED_FILE)
4535 req->file = io_file_get_fixed(req, cancel->fd,
4538 req->file = io_file_get_normal(req, cancel->fd);
4543 cd.file = req->file;
4546 ret = __io_async_cancel(&cd, req, issue_flags);
4550 io_req_set_res(req, ret, 0);
4554 static int io_files_update_prep(struct io_kiocb *req,
4555 const struct io_uring_sqe *sqe)
4557 struct io_rsrc_update *up = io_kiocb_to_cmd(req);
4559 if (unlikely(req->flags & (REQ_F_FIXED_FILE | REQ_F_BUFFER_SELECT)))
4561 if (sqe->rw_flags || sqe->splice_fd_in)
4564 up->offset = READ_ONCE(sqe->off);
4565 up->nr_args = READ_ONCE(sqe->len);
4568 up->arg = READ_ONCE(sqe->addr);
4572 static int io_files_update_with_index_alloc(struct io_kiocb *req,
4573 unsigned int issue_flags)
4575 struct io_rsrc_update *up = io_kiocb_to_cmd(req);
4576 __s32 __user *fds = u64_to_user_ptr(up->arg);
4581 if (!req->ctx->file_data)
4584 for (done = 0; done < up->nr_args; done++) {
4585 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
4595 ret = io_fixed_fd_install(req, issue_flags, file,
4596 IORING_FILE_INDEX_ALLOC);
4599 if (copy_to_user(&fds[done], &ret, sizeof(ret))) {
4600 __io_close_fixed(req, issue_flags, ret);
4611 static int io_files_update(struct io_kiocb *req, unsigned int issue_flags)
4613 struct io_rsrc_update *up = io_kiocb_to_cmd(req);
4614 struct io_ring_ctx *ctx = req->ctx;
4615 struct io_uring_rsrc_update2 up2;
4618 up2.offset = up->offset;
4625 if (up->offset == IORING_FILE_INDEX_ALLOC) {
4626 ret = io_files_update_with_index_alloc(req, issue_flags);
4628 io_ring_submit_lock(ctx, issue_flags);
4629 ret = __io_register_rsrc_update(ctx, IORING_RSRC_FILE,
4631 io_ring_submit_unlock(ctx, issue_flags);
4636 io_req_set_res(req, ret, 0);
4640 static int io_req_prep_async(struct io_kiocb *req)
4642 const struct io_op_def *def = &io_op_defs[req->opcode];
4644 /* assign early for deferred execution for non-fixed file */
4645 if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE))
4646 req->file = io_file_get_normal(req, req->cqe.fd);
4647 if (!def->prep_async)
4649 if (WARN_ON_ONCE(req_has_async_data(req)))
4651 if (io_alloc_async_data(req))
4654 return def->prep_async(req);
4657 static u32 io_get_sequence(struct io_kiocb *req)
4659 u32 seq = req->ctx->cached_sq_head;
4660 struct io_kiocb *cur;
4662 /* need original cached_sq_head, but it was increased for each req */
4663 io_for_each_link(cur, req)
4668 static __cold void io_drain_req(struct io_kiocb *req)
4670 struct io_ring_ctx *ctx = req->ctx;
4671 struct io_defer_entry *de;
4673 u32 seq = io_get_sequence(req);
4675 /* Still need defer if there is pending req in defer list. */
4676 spin_lock(&ctx->completion_lock);
4677 if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list)) {
4678 spin_unlock(&ctx->completion_lock);
4680 ctx->drain_active = false;
4681 io_req_task_queue(req);
4684 spin_unlock(&ctx->completion_lock);
4686 ret = io_req_prep_async(req);
4689 io_req_complete_failed(req, ret);
4692 io_prep_async_link(req);
4693 de = kmalloc(sizeof(*de), GFP_KERNEL);
4699 spin_lock(&ctx->completion_lock);
4700 if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
4701 spin_unlock(&ctx->completion_lock);
4706 trace_io_uring_defer(ctx, req, req->cqe.user_data, req->opcode);
4709 list_add_tail(&de->list, &ctx->defer_list);
4710 spin_unlock(&ctx->completion_lock);
4713 static void io_clean_op(struct io_kiocb *req)
4715 if (req->flags & REQ_F_BUFFER_SELECTED) {
4716 spin_lock(&req->ctx->completion_lock);
4717 io_put_kbuf_comp(req);
4718 spin_unlock(&req->ctx->completion_lock);
4721 if (req->flags & REQ_F_NEED_CLEANUP) {
4722 const struct io_op_def *def = &io_op_defs[req->opcode];
4727 if ((req->flags & REQ_F_POLLED) && req->apoll) {
4728 kfree(req->apoll->double_poll);
4732 if (req->flags & REQ_F_INFLIGHT) {
4733 struct io_uring_task *tctx = req->task->io_uring;
4735 atomic_dec(&tctx->inflight_tracked);
4737 if (req->flags & REQ_F_CREDS)
4738 put_cred(req->creds);
4739 if (req->flags & REQ_F_ASYNC_DATA) {
4740 kfree(req->async_data);
4741 req->async_data = NULL;
4743 req->flags &= ~IO_REQ_CLEAN_FLAGS;
4746 static bool io_assign_file(struct io_kiocb *req, unsigned int issue_flags)
4748 if (req->file || !io_op_defs[req->opcode].needs_file)
4751 if (req->flags & REQ_F_FIXED_FILE)
4752 req->file = io_file_get_fixed(req, req->cqe.fd, issue_flags);
4754 req->file = io_file_get_normal(req, req->cqe.fd);
4759 static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
4761 const struct io_op_def *def = &io_op_defs[req->opcode];
4762 const struct cred *creds = NULL;
4765 if (unlikely(!io_assign_file(req, issue_flags)))
4768 if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
4769 creds = override_creds(req->creds);
4771 if (!def->audit_skip)
4772 audit_uring_entry(req->opcode);
4774 ret = def->issue(req, issue_flags);
4776 if (!def->audit_skip)
4777 audit_uring_exit(!ret, ret);
4780 revert_creds(creds);
4783 __io_req_complete(req, issue_flags);
4784 else if (ret != IOU_ISSUE_SKIP_COMPLETE)
4787 /* If the op doesn't have a file, we're not polling for it */
4788 if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file)
4789 io_iopoll_req_issued(req, issue_flags);
4794 static struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
4796 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4798 req = io_put_req_find_next(req);
4799 return req ? &req->work : NULL;
4802 static void io_wq_submit_work(struct io_wq_work *work)
4804 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4805 const struct io_op_def *def = &io_op_defs[req->opcode];
4806 unsigned int issue_flags = IO_URING_F_UNLOCKED;
4807 bool needs_poll = false;
4808 int ret = 0, err = -ECANCELED;
4810 /* one will be dropped by ->io_free_work() after returning to io-wq */
4811 if (!(req->flags & REQ_F_REFCOUNT))
4812 __io_req_set_refcount(req, 2);
4816 io_arm_ltimeout(req);
4818 /* either cancelled or io-wq is dying, so don't touch tctx->iowq */
4819 if (work->flags & IO_WQ_WORK_CANCEL) {
4821 io_req_task_queue_fail(req, err);
4824 if (!io_assign_file(req, issue_flags)) {
4826 work->flags |= IO_WQ_WORK_CANCEL;
4830 if (req->flags & REQ_F_FORCE_ASYNC) {
4831 bool opcode_poll = def->pollin || def->pollout;
4833 if (opcode_poll && file_can_poll(req->file)) {
4835 issue_flags |= IO_URING_F_NONBLOCK;
4840 ret = io_issue_sqe(req, issue_flags);
4844 * We can get EAGAIN for iopolled IO even though we're
4845 * forcing a sync submission from here, since we can't
4846 * wait for request slots on the block side.
4849 if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
4855 if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
4857 /* aborted or ready, in either case retry blocking */
4859 issue_flags &= ~IO_URING_F_NONBLOCK;
4862 /* avoid locking problems by failing it from a clean context */
4864 io_req_task_queue_fail(req, ret);
4867 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
4870 struct io_fixed_file *slot = io_fixed_file_slot(&ctx->file_table, index);
4872 return (struct file *) (slot->file_ptr & FFS_MASK);
4875 static void io_fixed_file_set(struct io_fixed_file *file_slot, struct file *file)
4877 unsigned long file_ptr = (unsigned long) file;
4879 file_ptr |= io_file_get_flags(file);
4880 file_slot->file_ptr = file_ptr;
4883 inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
4884 unsigned int issue_flags)
4886 struct io_ring_ctx *ctx = req->ctx;
4887 struct file *file = NULL;
4888 unsigned long file_ptr;
4890 io_ring_submit_lock(ctx, issue_flags);
4892 if (unlikely((unsigned int)fd >= ctx->nr_user_files))
4894 fd = array_index_nospec(fd, ctx->nr_user_files);
4895 file_ptr = io_fixed_file_slot(&ctx->file_table, fd)->file_ptr;
4896 file = (struct file *) (file_ptr & FFS_MASK);
4897 file_ptr &= ~FFS_MASK;
4898 /* mask in overlapping REQ_F and FFS bits */
4899 req->flags |= (file_ptr << REQ_F_SUPPORT_NOWAIT_BIT);
4900 io_req_set_rsrc_node(req, ctx, 0);
4901 WARN_ON_ONCE(file && !test_bit(fd, ctx->file_table.bitmap));
4903 io_ring_submit_unlock(ctx, issue_flags);
4907 struct file *io_file_get_normal(struct io_kiocb *req, int fd)
4909 struct file *file = fget(fd);
4911 trace_io_uring_file_get(req->ctx, req, req->cqe.user_data, fd);
4913 /* we don't allow fixed io_uring files */
4914 if (file && file->f_op == &io_uring_fops)
4915 io_req_track_inflight(req);
4919 static void io_queue_async(struct io_kiocb *req, int ret)
4920 __must_hold(&req->ctx->uring_lock)
4922 struct io_kiocb *linked_timeout;
4924 if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
4925 io_req_complete_failed(req, ret);
4929 linked_timeout = io_prep_linked_timeout(req);
4931 switch (io_arm_poll_handler(req, 0)) {
4932 case IO_APOLL_READY:
4933 io_req_task_queue(req);
4935 case IO_APOLL_ABORTED:
4937 * Queued up for async execution, worker will release
4938 * submit reference when the iocb is actually submitted.
4940 io_kbuf_recycle(req, 0);
4941 io_queue_iowq(req, NULL);
4948 io_queue_linked_timeout(linked_timeout);
4951 static inline void io_queue_sqe(struct io_kiocb *req)
4952 __must_hold(&req->ctx->uring_lock)
4956 ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
4958 if (req->flags & REQ_F_COMPLETE_INLINE) {
4959 io_req_add_compl_list(req);
4963 * We async punt it if the file wasn't marked NOWAIT, or if the file
4964 * doesn't support non-blocking read/write attempts
4967 io_arm_ltimeout(req);
4969 io_queue_async(req, ret);
4972 static void io_queue_sqe_fallback(struct io_kiocb *req)
4973 __must_hold(&req->ctx->uring_lock)
4975 if (unlikely(req->flags & REQ_F_FAIL)) {
4977 * We don't submit, fail them all, for that replace hardlinks
4978 * with normal links. Extra REQ_F_LINK is tolerated.
4980 req->flags &= ~REQ_F_HARDLINK;
4981 req->flags |= REQ_F_LINK;
4982 io_req_complete_failed(req, req->cqe.res);
4983 } else if (unlikely(req->ctx->drain_active)) {
4986 int ret = io_req_prep_async(req);
4989 io_req_complete_failed(req, ret);
4991 io_queue_iowq(req, NULL);
4996 * Check SQE restrictions (opcode and flags).
4998 * Returns 'true' if SQE is allowed, 'false' otherwise.
5000 static inline bool io_check_restriction(struct io_ring_ctx *ctx,
5001 struct io_kiocb *req,
5002 unsigned int sqe_flags)
5004 if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
5007 if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
5008 ctx->restrictions.sqe_flags_required)
5011 if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
5012 ctx->restrictions.sqe_flags_required))
5018 static void io_init_req_drain(struct io_kiocb *req)
5020 struct io_ring_ctx *ctx = req->ctx;
5021 struct io_kiocb *head = ctx->submit_state.link.head;
5023 ctx->drain_active = true;
5026 * If we need to drain a request in the middle of a link, drain
5027 * the head request and the next request/link after the current
5028 * link. Considering sequential execution of links,
5029 * REQ_F_IO_DRAIN will be maintained for every request of our
5032 head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
5033 ctx->drain_next = true;
5037 static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
5038 const struct io_uring_sqe *sqe)
5039 __must_hold(&ctx->uring_lock)
5041 const struct io_op_def *def;
5042 unsigned int sqe_flags;
5046 /* req is partially pre-initialised, see io_preinit_req() */
5047 req->opcode = opcode = READ_ONCE(sqe->opcode);
5048 /* same numerical values with corresponding REQ_F_*, safe to copy */
5049 req->flags = sqe_flags = READ_ONCE(sqe->flags);
5050 req->cqe.user_data = READ_ONCE(sqe->user_data);
5052 req->rsrc_node = NULL;
5053 req->task = current;
5055 if (unlikely(opcode >= IORING_OP_LAST)) {
5059 def = &io_op_defs[opcode];
5060 if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
5061 /* enforce forwards compatibility on users */
5062 if (sqe_flags & ~SQE_VALID_FLAGS)
5064 if (sqe_flags & IOSQE_BUFFER_SELECT) {
5065 if (!def->buffer_select)
5067 req->buf_index = READ_ONCE(sqe->buf_group);
5069 if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
5070 ctx->drain_disabled = true;
5071 if (sqe_flags & IOSQE_IO_DRAIN) {
5072 if (ctx->drain_disabled)
5074 io_init_req_drain(req);
5077 if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
5078 if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
5080 /* knock it to the slow queue path, will be drained there */
5081 if (ctx->drain_active)
5082 req->flags |= REQ_F_FORCE_ASYNC;
5083 /* if there is no link, we're at "next" request and need to drain */
5084 if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
5085 ctx->drain_next = false;
5086 ctx->drain_active = true;
5087 req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
5091 if (!def->ioprio && sqe->ioprio)
5093 if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL))
5096 if (def->needs_file) {
5097 struct io_submit_state *state = &ctx->submit_state;
5099 req->cqe.fd = READ_ONCE(sqe->fd);
5102 * Plug now if we have more than 2 IO left after this, and the
5103 * target is potentially a read/write to block based storage.
5105 if (state->need_plug && def->plug) {
5106 state->plug_started = true;
5107 state->need_plug = false;
5108 blk_start_plug_nr_ios(&state->plug, state->submit_nr);
5112 personality = READ_ONCE(sqe->personality);
5116 req->creds = xa_load(&ctx->personalities, personality);
5119 get_cred(req->creds);
5120 ret = security_uring_override_creds(req->creds);
5122 put_cred(req->creds);
5125 req->flags |= REQ_F_CREDS;
5128 return def->prep(req, sqe);
5131 static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe,
5132 struct io_kiocb *req, int ret)
5134 struct io_ring_ctx *ctx = req->ctx;
5135 struct io_submit_link *link = &ctx->submit_state.link;
5136 struct io_kiocb *head = link->head;
5138 trace_io_uring_req_failed(sqe, ctx, req, ret);
5141 * Avoid breaking links in the middle as it renders links with SQPOLL
5142 * unusable. Instead of failing eagerly, continue assembling the link if
5143 * applicable and mark the head with REQ_F_FAIL. The link flushing code
5144 * should find the flag and handle the rest.
5146 req_fail_link_node(req, ret);
5147 if (head && !(head->flags & REQ_F_FAIL))
5148 req_fail_link_node(head, -ECANCELED);
5150 if (!(req->flags & IO_REQ_LINK_FLAGS)) {
5152 link->last->link = req;
5156 io_queue_sqe_fallback(req);
5161 link->last->link = req;
5168 static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
5169 const struct io_uring_sqe *sqe)
5170 __must_hold(&ctx->uring_lock)
5172 struct io_submit_link *link = &ctx->submit_state.link;
5175 ret = io_init_req(ctx, req, sqe);
5177 return io_submit_fail_init(sqe, req, ret);
5179 /* don't need @sqe from now on */
5180 trace_io_uring_submit_sqe(ctx, req, req->cqe.user_data, req->opcode,
5182 ctx->flags & IORING_SETUP_SQPOLL);
5185 * If we already have a head request, queue this one for async
5186 * submittal once the head completes. If we don't have a head but
5187 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
5188 * submitted sync once the chain is complete. If none of those
5189 * conditions are true (normal request), then just queue it.
5191 if (unlikely(link->head)) {
5192 ret = io_req_prep_async(req);
5194 return io_submit_fail_init(sqe, req, ret);
5196 trace_io_uring_link(ctx, req, link->head);
5197 link->last->link = req;
5200 if (req->flags & IO_REQ_LINK_FLAGS)
5202 /* last request of the link, flush it */
5205 if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
5208 } else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
5209 REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
5210 if (req->flags & IO_REQ_LINK_FLAGS) {
5215 io_queue_sqe_fallback(req);
5225 * Batched submission is done, ensure local IO is flushed out.
5227 static void io_submit_state_end(struct io_ring_ctx *ctx)
5229 struct io_submit_state *state = &ctx->submit_state;
5231 if (unlikely(state->link.head))
5232 io_queue_sqe_fallback(state->link.head);
5233 /* flush only after queuing links as they can generate completions */
5234 io_submit_flush_completions(ctx);
5235 if (state->plug_started)
5236 blk_finish_plug(&state->plug);
5240 * Start submission side cache.
5242 static void io_submit_state_start(struct io_submit_state *state,
5243 unsigned int max_ios)
5245 state->plug_started = false;
5246 state->need_plug = max_ios > 2;
5247 state->submit_nr = max_ios;
5248 /* set only head, no need to init link_last in advance */
5249 state->link.head = NULL;
5252 static void io_commit_sqring(struct io_ring_ctx *ctx)
5254 struct io_rings *rings = ctx->rings;
5257 * Ensure any loads from the SQEs are done at this point,
5258 * since once we write the new head, the application could
5259 * write new data to them.
5261 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
5265 * Fetch an sqe, if one is available. Note this returns a pointer to memory
5266 * that is mapped by userspace. This means that care needs to be taken to
5267 * ensure that reads are stable, as we cannot rely on userspace always
5268 * being a good citizen. If members of the sqe are validated and then later
5269 * used, it's important that those reads are done through READ_ONCE() to
5270 * prevent a re-load down the line.
5272 static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx)
5274 unsigned head, mask = ctx->sq_entries - 1;
5275 unsigned sq_idx = ctx->cached_sq_head++ & mask;
5278 * The cached sq head (or cq tail) serves two purposes:
5280 * 1) allows us to batch the cost of updating the user visible
5282 * 2) allows the kernel side to track the head on its own, even
5283 * though the application is the one updating it.
5285 head = READ_ONCE(ctx->sq_array[sq_idx]);
5286 if (likely(head < ctx->sq_entries)) {
5287 /* double index for 128-byte SQEs, twice as long */
5288 if (ctx->flags & IORING_SETUP_SQE128)
5290 return &ctx->sq_sqes[head];
5293 /* drop invalid entries */
5295 WRITE_ONCE(ctx->rings->sq_dropped,
5296 READ_ONCE(ctx->rings->sq_dropped) + 1);
5300 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
5301 __must_hold(&ctx->uring_lock)
5303 unsigned int entries = io_sqring_entries(ctx);
5307 if (unlikely(!entries))
5309 /* make sure SQ entry isn't read before tail */
5310 ret = left = min3(nr, ctx->sq_entries, entries);
5311 io_get_task_refs(left);
5312 io_submit_state_start(&ctx->submit_state, left);
5315 const struct io_uring_sqe *sqe;
5316 struct io_kiocb *req;
5318 if (unlikely(!io_alloc_req_refill(ctx)))
5320 req = io_alloc_req(ctx);
5321 sqe = io_get_sqe(ctx);
5322 if (unlikely(!sqe)) {
5323 io_req_add_to_cache(req, ctx);
5328 * Continue submitting even for sqe failure if the
5329 * ring was setup with IORING_SETUP_SUBMIT_ALL
5331 if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
5332 !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
5338 if (unlikely(left)) {
5340 /* try again if it submitted nothing and can't allocate a req */
5341 if (!ret && io_req_cache_empty(ctx))
5343 current->io_uring->cached_refs += left;
5346 io_submit_state_end(ctx);
5347 /* Commit SQ ring head once we've consumed and submitted all SQEs */
5348 io_commit_sqring(ctx);
5352 static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
5354 return READ_ONCE(sqd->state);
5357 static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
5359 unsigned int to_submit;
5362 to_submit = io_sqring_entries(ctx);
5363 /* if we're handling multiple rings, cap submit size for fairness */
5364 if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
5365 to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
5367 if (!wq_list_empty(&ctx->iopoll_list) || to_submit) {
5368 const struct cred *creds = NULL;
5370 if (ctx->sq_creds != current_cred())
5371 creds = override_creds(ctx->sq_creds);
5373 mutex_lock(&ctx->uring_lock);
5374 if (!wq_list_empty(&ctx->iopoll_list))
5375 io_do_iopoll(ctx, true);
5378 * Don't submit if refs are dying, good for io_uring_register(),
5379 * but also it is relied upon by io_ring_exit_work()
5381 if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
5382 !(ctx->flags & IORING_SETUP_R_DISABLED))
5383 ret = io_submit_sqes(ctx, to_submit);
5384 mutex_unlock(&ctx->uring_lock);
5386 if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
5387 wake_up(&ctx->sqo_sq_wait);
5389 revert_creds(creds);
5395 static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
5397 struct io_ring_ctx *ctx;
5398 unsigned sq_thread_idle = 0;
5400 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
5401 sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
5402 sqd->sq_thread_idle = sq_thread_idle;
5405 static bool io_sqd_handle_event(struct io_sq_data *sqd)
5407 bool did_sig = false;
5408 struct ksignal ksig;
5410 if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
5411 signal_pending(current)) {
5412 mutex_unlock(&sqd->lock);
5413 if (signal_pending(current))
5414 did_sig = get_signal(&ksig);
5416 mutex_lock(&sqd->lock);
5418 return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
5421 static int io_sq_thread(void *data)
5423 struct io_sq_data *sqd = data;
5424 struct io_ring_ctx *ctx;
5425 unsigned long timeout = 0;
5426 char buf[TASK_COMM_LEN];
5429 snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
5430 set_task_comm(current, buf);
5432 if (sqd->sq_cpu != -1)
5433 set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
5435 set_cpus_allowed_ptr(current, cpu_online_mask);
5436 current->flags |= PF_NO_SETAFFINITY;
5438 audit_alloc_kernel(current);
5440 mutex_lock(&sqd->lock);
5442 bool cap_entries, sqt_spin = false;
5444 if (io_sqd_events_pending(sqd) || signal_pending(current)) {
5445 if (io_sqd_handle_event(sqd))
5447 timeout = jiffies + sqd->sq_thread_idle;
5450 cap_entries = !list_is_singular(&sqd->ctx_list);
5451 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
5452 int ret = __io_sq_thread(ctx, cap_entries);
5454 if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
5457 if (io_run_task_work())
5460 if (sqt_spin || !time_after(jiffies, timeout)) {
5463 timeout = jiffies + sqd->sq_thread_idle;
5467 prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
5468 if (!io_sqd_events_pending(sqd) && !task_work_pending(current)) {
5469 bool needs_sched = true;
5471 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
5472 atomic_or(IORING_SQ_NEED_WAKEUP,
5473 &ctx->rings->sq_flags);
5474 if ((ctx->flags & IORING_SETUP_IOPOLL) &&
5475 !wq_list_empty(&ctx->iopoll_list)) {
5476 needs_sched = false;
5481 * Ensure the store of the wakeup flag is not
5482 * reordered with the load of the SQ tail
5484 smp_mb__after_atomic();
5486 if (io_sqring_entries(ctx)) {
5487 needs_sched = false;
5493 mutex_unlock(&sqd->lock);
5495 mutex_lock(&sqd->lock);
5497 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
5498 atomic_andnot(IORING_SQ_NEED_WAKEUP,
5499 &ctx->rings->sq_flags);
5502 finish_wait(&sqd->wait, &wait);
5503 timeout = jiffies + sqd->sq_thread_idle;
5506 io_uring_cancel_generic(true, sqd);
5508 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
5509 atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
5511 mutex_unlock(&sqd->lock);
5513 audit_free(current);
5515 complete(&sqd->exited);
5519 struct io_wait_queue {
5520 struct wait_queue_entry wq;
5521 struct io_ring_ctx *ctx;
5523 unsigned nr_timeouts;
5526 static inline bool io_should_wake(struct io_wait_queue *iowq)
5528 struct io_ring_ctx *ctx = iowq->ctx;
5529 int dist = ctx->cached_cq_tail - (int) iowq->cq_tail;
5532 * Wake up if we have enough events, or if a timeout occurred since we
5533 * started waiting. For timeouts, we always want to return to userspace,
5534 * regardless of event count.
5536 return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
5539 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
5540 int wake_flags, void *key)
5542 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
5546 * Cannot safely flush overflowed CQEs from here, ensure we wake up
5547 * the task, and the next invocation will do it.
5549 if (io_should_wake(iowq) ||
5550 test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &iowq->ctx->check_cq))
5551 return autoremove_wake_function(curr, mode, wake_flags, key);
5555 static int io_run_task_work_sig(void)
5557 if (io_run_task_work())
5559 if (test_thread_flag(TIF_NOTIFY_SIGNAL))
5560 return -ERESTARTSYS;
5561 if (task_sigpending(current))
5566 /* when returns >0, the caller should retry */
5567 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
5568 struct io_wait_queue *iowq,
5572 unsigned long check_cq;
5574 /* make sure we run task_work before checking for signals */
5575 ret = io_run_task_work_sig();
5576 if (ret || io_should_wake(iowq))
5578 check_cq = READ_ONCE(ctx->check_cq);
5579 /* let the caller flush overflows, retry */
5580 if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
5582 if (unlikely(check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)))
5584 if (!schedule_hrtimeout(&timeout, HRTIMER_MODE_ABS))
5590 * Wait until events become available, if we don't already have some. The
5591 * application must reap them itself, as they reside on the shared cq ring.
5593 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
5594 const sigset_t __user *sig, size_t sigsz,
5595 struct __kernel_timespec __user *uts)
5597 struct io_wait_queue iowq;
5598 struct io_rings *rings = ctx->rings;
5599 ktime_t timeout = KTIME_MAX;
5603 io_cqring_overflow_flush(ctx);
5604 if (io_cqring_events(ctx) >= min_events)
5606 if (!io_run_task_work())
5611 #ifdef CONFIG_COMPAT
5612 if (in_compat_syscall())
5613 ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
5617 ret = set_user_sigmask(sig, sigsz);
5624 struct timespec64 ts;
5626 if (get_timespec64(&ts, uts))
5628 timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
5631 init_waitqueue_func_entry(&iowq.wq, io_wake_function);
5632 iowq.wq.private = current;
5633 INIT_LIST_HEAD(&iowq.wq.entry);
5635 iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
5636 iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
5638 trace_io_uring_cqring_wait(ctx, min_events);
5640 /* if we can't even flush overflow, don't wait for more */
5641 if (!io_cqring_overflow_flush(ctx)) {
5645 prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
5646 TASK_INTERRUPTIBLE);
5647 ret = io_cqring_wait_schedule(ctx, &iowq, timeout);
5651 finish_wait(&ctx->cq_wait, &iowq.wq);
5652 restore_saved_sigmask_unless(ret == -EINTR);
5654 return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
5657 static void io_free_page_table(void **table, size_t size)
5659 unsigned i, nr_tables = DIV_ROUND_UP(size, PAGE_SIZE);
5661 for (i = 0; i < nr_tables; i++)
5666 static __cold void **io_alloc_page_table(size_t size)
5668 unsigned i, nr_tables = DIV_ROUND_UP(size, PAGE_SIZE);
5669 size_t init_size = size;
5672 table = kcalloc(nr_tables, sizeof(*table), GFP_KERNEL_ACCOUNT);
5676 for (i = 0; i < nr_tables; i++) {
5677 unsigned int this_size = min_t(size_t, size, PAGE_SIZE);
5679 table[i] = kzalloc(this_size, GFP_KERNEL_ACCOUNT);
5681 io_free_page_table(table, init_size);
5689 static void io_rsrc_node_destroy(struct io_rsrc_node *ref_node)
5691 percpu_ref_exit(&ref_node->refs);
5695 static __cold void io_rsrc_node_ref_zero(struct percpu_ref *ref)
5697 struct io_rsrc_node *node = container_of(ref, struct io_rsrc_node, refs);
5698 struct io_ring_ctx *ctx = node->rsrc_data->ctx;
5699 unsigned long flags;
5700 bool first_add = false;
5701 unsigned long delay = HZ;
5703 spin_lock_irqsave(&ctx->rsrc_ref_lock, flags);
5706 /* if we are mid-quiesce then do not delay */
5707 if (node->rsrc_data->quiesce)
5710 while (!list_empty(&ctx->rsrc_ref_list)) {
5711 node = list_first_entry(&ctx->rsrc_ref_list,
5712 struct io_rsrc_node, node);
5713 /* recycle ref nodes in order */
5716 list_del(&node->node);
5717 first_add |= llist_add(&node->llist, &ctx->rsrc_put_llist);
5719 spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags);
5722 mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay);
5725 static struct io_rsrc_node *io_rsrc_node_alloc(void)
5727 struct io_rsrc_node *ref_node;
5729 ref_node = kzalloc(sizeof(*ref_node), GFP_KERNEL);
5733 if (percpu_ref_init(&ref_node->refs, io_rsrc_node_ref_zero,
5738 INIT_LIST_HEAD(&ref_node->node);
5739 INIT_LIST_HEAD(&ref_node->rsrc_list);
5740 ref_node->done = false;
5744 void io_rsrc_node_switch(struct io_ring_ctx *ctx,
5745 struct io_rsrc_data *data_to_kill)
5746 __must_hold(&ctx->uring_lock)
5748 WARN_ON_ONCE(!ctx->rsrc_backup_node);
5749 WARN_ON_ONCE(data_to_kill && !ctx->rsrc_node);
5751 io_rsrc_refs_drop(ctx);
5754 struct io_rsrc_node *rsrc_node = ctx->rsrc_node;
5756 rsrc_node->rsrc_data = data_to_kill;
5757 spin_lock_irq(&ctx->rsrc_ref_lock);
5758 list_add_tail(&rsrc_node->node, &ctx->rsrc_ref_list);
5759 spin_unlock_irq(&ctx->rsrc_ref_lock);
5761 atomic_inc(&data_to_kill->refs);
5762 percpu_ref_kill(&rsrc_node->refs);
5763 ctx->rsrc_node = NULL;
5766 if (!ctx->rsrc_node) {
5767 ctx->rsrc_node = ctx->rsrc_backup_node;
5768 ctx->rsrc_backup_node = NULL;
5772 int io_rsrc_node_switch_start(struct io_ring_ctx *ctx)
5774 if (ctx->rsrc_backup_node)
5776 ctx->rsrc_backup_node = io_rsrc_node_alloc();
5777 return ctx->rsrc_backup_node ? 0 : -ENOMEM;
5780 static __cold int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
5781 struct io_ring_ctx *ctx)
5785 /* As we may drop ->uring_lock, other task may have started quiesce */
5789 data->quiesce = true;
5791 ret = io_rsrc_node_switch_start(ctx);
5794 io_rsrc_node_switch(ctx, data);
5796 /* kill initial ref, already quiesced if zero */
5797 if (atomic_dec_and_test(&data->refs))
5799 mutex_unlock(&ctx->uring_lock);
5800 flush_delayed_work(&ctx->rsrc_put_work);
5801 ret = wait_for_completion_interruptible(&data->done);
5803 mutex_lock(&ctx->uring_lock);
5804 if (atomic_read(&data->refs) > 0) {
5806 * it has been revived by another thread while
5809 mutex_unlock(&ctx->uring_lock);
5815 atomic_inc(&data->refs);
5816 /* wait for all works potentially completing data->done */
5817 flush_delayed_work(&ctx->rsrc_put_work);
5818 reinit_completion(&data->done);
5820 ret = io_run_task_work_sig();
5821 mutex_lock(&ctx->uring_lock);
5823 data->quiesce = false;
5828 static u64 *io_get_tag_slot(struct io_rsrc_data *data, unsigned int idx)
5830 unsigned int off = idx & IO_RSRC_TAG_TABLE_MASK;
5831 unsigned int table_idx = idx >> IO_RSRC_TAG_TABLE_SHIFT;
5833 return &data->tags[table_idx][off];
5836 static void io_rsrc_data_free(struct io_rsrc_data *data)
5838 size_t size = data->nr * sizeof(data->tags[0][0]);
5841 io_free_page_table((void **)data->tags, size);
5845 static __cold int io_rsrc_data_alloc(struct io_ring_ctx *ctx, rsrc_put_fn *do_put,
5846 u64 __user *utags, unsigned nr,
5847 struct io_rsrc_data **pdata)
5849 struct io_rsrc_data *data;
5853 data = kzalloc(sizeof(*data), GFP_KERNEL);
5856 data->tags = (u64 **)io_alloc_page_table(nr * sizeof(data->tags[0][0]));
5864 data->do_put = do_put;
5867 for (i = 0; i < nr; i++) {
5868 u64 *tag_slot = io_get_tag_slot(data, i);
5870 if (copy_from_user(tag_slot, &utags[i],
5876 atomic_set(&data->refs, 1);
5877 init_completion(&data->done);
5881 io_rsrc_data_free(data);
5885 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
5887 #if !defined(IO_URING_SCM_ALL)
5890 for (i = 0; i < ctx->nr_user_files; i++) {
5891 struct file *file = io_file_from_index(ctx, i);
5895 if (io_fixed_file_slot(&ctx->file_table, i)->file_ptr & FFS_SCM)
5897 io_file_bitmap_clear(&ctx->file_table, i);
5902 #if defined(CONFIG_UNIX)
5903 if (ctx->ring_sock) {
5904 struct sock *sock = ctx->ring_sock->sk;
5905 struct sk_buff *skb;
5907 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
5911 io_free_file_tables(&ctx->file_table);
5912 io_rsrc_data_free(ctx->file_data);
5913 ctx->file_data = NULL;
5914 ctx->nr_user_files = 0;
5917 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
5919 unsigned nr = ctx->nr_user_files;
5922 if (!ctx->file_data)
5926 * Quiesce may unlock ->uring_lock, and while it's not held
5927 * prevent new requests using the table.
5929 ctx->nr_user_files = 0;
5930 ret = io_rsrc_ref_quiesce(ctx->file_data, ctx);
5931 ctx->nr_user_files = nr;
5933 __io_sqe_files_unregister(ctx);
5937 static void io_sq_thread_unpark(struct io_sq_data *sqd)
5938 __releases(&sqd->lock)
5940 WARN_ON_ONCE(sqd->thread == current);
5943 * Do the dance but not conditional clear_bit() because it'd race with
5944 * other threads incrementing park_pending and setting the bit.
5946 clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
5947 if (atomic_dec_return(&sqd->park_pending))
5948 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
5949 mutex_unlock(&sqd->lock);
5952 static void io_sq_thread_park(struct io_sq_data *sqd)
5953 __acquires(&sqd->lock)
5955 WARN_ON_ONCE(sqd->thread == current);
5957 atomic_inc(&sqd->park_pending);
5958 set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
5959 mutex_lock(&sqd->lock);
5961 wake_up_process(sqd->thread);
5964 static void io_sq_thread_stop(struct io_sq_data *sqd)
5966 WARN_ON_ONCE(sqd->thread == current);
5967 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
5969 set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
5970 mutex_lock(&sqd->lock);
5972 wake_up_process(sqd->thread);
5973 mutex_unlock(&sqd->lock);
5974 wait_for_completion(&sqd->exited);
5977 static void io_put_sq_data(struct io_sq_data *sqd)
5979 if (refcount_dec_and_test(&sqd->refs)) {
5980 WARN_ON_ONCE(atomic_read(&sqd->park_pending));
5982 io_sq_thread_stop(sqd);
5987 static void io_sq_thread_finish(struct io_ring_ctx *ctx)
5989 struct io_sq_data *sqd = ctx->sq_data;
5992 io_sq_thread_park(sqd);
5993 list_del_init(&ctx->sqd_list);
5994 io_sqd_update_thread_idle(sqd);
5995 io_sq_thread_unpark(sqd);
5997 io_put_sq_data(sqd);
5998 ctx->sq_data = NULL;
6002 static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
6004 struct io_ring_ctx *ctx_attach;
6005 struct io_sq_data *sqd;
6008 f = fdget(p->wq_fd);
6010 return ERR_PTR(-ENXIO);
6011 if (f.file->f_op != &io_uring_fops) {
6013 return ERR_PTR(-EINVAL);
6016 ctx_attach = f.file->private_data;
6017 sqd = ctx_attach->sq_data;
6020 return ERR_PTR(-EINVAL);
6022 if (sqd->task_tgid != current->tgid) {
6024 return ERR_PTR(-EPERM);
6027 refcount_inc(&sqd->refs);
6032 static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
6035 struct io_sq_data *sqd;
6038 if (p->flags & IORING_SETUP_ATTACH_WQ) {
6039 sqd = io_attach_sq_data(p);
6044 /* fall through for EPERM case, setup new sqd/task */
6045 if (PTR_ERR(sqd) != -EPERM)
6049 sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
6051 return ERR_PTR(-ENOMEM);
6053 atomic_set(&sqd->park_pending, 0);
6054 refcount_set(&sqd->refs, 1);
6055 INIT_LIST_HEAD(&sqd->ctx_list);
6056 mutex_init(&sqd->lock);
6057 init_waitqueue_head(&sqd->wait);
6058 init_completion(&sqd->exited);
6063 * Ensure the UNIX gc is aware of our file set, so we are certain that
6064 * the io_uring can be safely unregistered on process exit, even if we have
6065 * loops in the file referencing. We account only files that can hold other
6066 * files because otherwise they can't form a loop and so are not interesting
6069 static int io_scm_file_account(struct io_ring_ctx *ctx, struct file *file)
6071 #if defined(CONFIG_UNIX)
6072 struct sock *sk = ctx->ring_sock->sk;
6073 struct sk_buff_head *head = &sk->sk_receive_queue;
6074 struct scm_fp_list *fpl;
6075 struct sk_buff *skb;
6077 if (likely(!io_file_need_scm(file)))
6081 * See if we can merge this file into an existing skb SCM_RIGHTS
6082 * file set. If there's no room, fall back to allocating a new skb
6083 * and filling it in.
6085 spin_lock_irq(&head->lock);
6086 skb = skb_peek(head);
6087 if (skb && UNIXCB(skb).fp->count < SCM_MAX_FD)
6088 __skb_unlink(skb, head);
6091 spin_unlock_irq(&head->lock);
6094 fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
6098 skb = alloc_skb(0, GFP_KERNEL);
6104 fpl->user = get_uid(current_user());
6105 fpl->max = SCM_MAX_FD;
6108 UNIXCB(skb).fp = fpl;
6110 skb->destructor = unix_destruct_scm;
6111 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
6114 fpl = UNIXCB(skb).fp;
6115 fpl->fp[fpl->count++] = get_file(file);
6116 unix_inflight(fpl->user, file);
6117 skb_queue_head(head, skb);
6123 static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc)
6125 struct file *file = prsrc->file;
6126 #if defined(CONFIG_UNIX)
6127 struct sock *sock = ctx->ring_sock->sk;
6128 struct sk_buff_head list, *head = &sock->sk_receive_queue;
6129 struct sk_buff *skb;
6132 if (!io_file_need_scm(file)) {
6137 __skb_queue_head_init(&list);
6140 * Find the skb that holds this file in its SCM_RIGHTS. When found,
6141 * remove this entry and rearrange the file array.
6143 skb = skb_dequeue(head);
6145 struct scm_fp_list *fp;
6147 fp = UNIXCB(skb).fp;
6148 for (i = 0; i < fp->count; i++) {
6151 if (fp->fp[i] != file)
6154 unix_notinflight(fp->user, fp->fp[i]);
6155 left = fp->count - 1 - i;
6157 memmove(&fp->fp[i], &fp->fp[i + 1],
6158 left * sizeof(struct file *));
6165 __skb_queue_tail(&list, skb);
6175 __skb_queue_tail(&list, skb);
6177 skb = skb_dequeue(head);
6180 if (skb_peek(&list)) {
6181 spin_lock_irq(&head->lock);
6182 while ((skb = __skb_dequeue(&list)) != NULL)
6183 __skb_queue_tail(head, skb);
6184 spin_unlock_irq(&head->lock);
6191 static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
6193 struct io_rsrc_data *rsrc_data = ref_node->rsrc_data;
6194 struct io_ring_ctx *ctx = rsrc_data->ctx;
6195 struct io_rsrc_put *prsrc, *tmp;
6197 list_for_each_entry_safe(prsrc, tmp, &ref_node->rsrc_list, list) {
6198 list_del(&prsrc->list);
6201 if (ctx->flags & IORING_SETUP_IOPOLL)
6202 mutex_lock(&ctx->uring_lock);
6204 spin_lock(&ctx->completion_lock);
6205 io_fill_cqe_aux(ctx, prsrc->tag, 0, 0);
6206 io_commit_cqring(ctx);
6207 spin_unlock(&ctx->completion_lock);
6208 io_cqring_ev_posted(ctx);
6210 if (ctx->flags & IORING_SETUP_IOPOLL)
6211 mutex_unlock(&ctx->uring_lock);
6214 rsrc_data->do_put(ctx, prsrc);
6218 io_rsrc_node_destroy(ref_node);
6219 if (atomic_dec_and_test(&rsrc_data->refs))
6220 complete(&rsrc_data->done);
6223 static void io_rsrc_put_work(struct work_struct *work)
6225 struct io_ring_ctx *ctx;
6226 struct llist_node *node;
6228 ctx = container_of(work, struct io_ring_ctx, rsrc_put_work.work);
6229 node = llist_del_all(&ctx->rsrc_put_llist);
6232 struct io_rsrc_node *ref_node;
6233 struct llist_node *next = node->next;
6235 ref_node = llist_entry(node, struct io_rsrc_node, llist);
6236 __io_rsrc_put_work(ref_node);
6241 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
6242 unsigned nr_args, u64 __user *tags)
6244 __s32 __user *fds = (__s32 __user *) arg;
6253 if (nr_args > IORING_MAX_FIXED_FILES)
6255 if (nr_args > rlimit(RLIMIT_NOFILE))
6257 ret = io_rsrc_node_switch_start(ctx);
6260 ret = io_rsrc_data_alloc(ctx, io_rsrc_file_put, tags, nr_args,
6265 if (!io_alloc_file_tables(&ctx->file_table, nr_args)) {
6266 io_rsrc_data_free(ctx->file_data);
6267 ctx->file_data = NULL;
6271 for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
6272 struct io_fixed_file *file_slot;
6274 if (fds && copy_from_user(&fd, &fds[i], sizeof(fd))) {
6278 /* allow sparse sets */
6279 if (!fds || fd == -1) {
6281 if (unlikely(*io_get_tag_slot(ctx->file_data, i)))
6288 if (unlikely(!file))
6292 * Don't allow io_uring instances to be registered. If UNIX
6293 * isn't enabled, then this causes a reference cycle and this
6294 * instance can never get freed. If UNIX is enabled we'll
6295 * handle it just fine, but there's still no point in allowing
6296 * a ring fd as it doesn't support regular read/write anyway.
6298 if (file->f_op == &io_uring_fops) {
6302 ret = io_scm_file_account(ctx, file);
6307 file_slot = io_fixed_file_slot(&ctx->file_table, i);
6308 io_fixed_file_set(file_slot, file);
6309 io_file_bitmap_set(&ctx->file_table, i);
6312 io_rsrc_node_switch(ctx, NULL);
6315 __io_sqe_files_unregister(ctx);
6319 int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx,
6320 struct io_rsrc_node *node, void *rsrc)
6322 u64 *tag_slot = io_get_tag_slot(data, idx);
6323 struct io_rsrc_put *prsrc;
6325 prsrc = kzalloc(sizeof(*prsrc), GFP_KERNEL);
6329 prsrc->tag = *tag_slot;
6332 list_add(&prsrc->list, &node->rsrc_list);
6336 int io_install_fixed_file(struct io_kiocb *req, struct file *file,
6337 unsigned int issue_flags, u32 slot_index)
6338 __must_hold(&req->ctx->uring_lock)
6340 struct io_ring_ctx *ctx = req->ctx;
6341 bool needs_switch = false;
6342 struct io_fixed_file *file_slot;
6345 if (file->f_op == &io_uring_fops)
6347 if (!ctx->file_data)
6349 if (slot_index >= ctx->nr_user_files)
6352 slot_index = array_index_nospec(slot_index, ctx->nr_user_files);
6353 file_slot = io_fixed_file_slot(&ctx->file_table, slot_index);
6355 if (file_slot->file_ptr) {
6356 struct file *old_file;
6358 ret = io_rsrc_node_switch_start(ctx);
6362 old_file = (struct file *)(file_slot->file_ptr & FFS_MASK);
6363 ret = io_queue_rsrc_removal(ctx->file_data, slot_index,
6364 ctx->rsrc_node, old_file);
6367 file_slot->file_ptr = 0;
6368 io_file_bitmap_clear(&ctx->file_table, slot_index);
6369 needs_switch = true;
6372 ret = io_scm_file_account(ctx, file);
6374 *io_get_tag_slot(ctx->file_data, slot_index) = 0;
6375 io_fixed_file_set(file_slot, file);
6376 io_file_bitmap_set(&ctx->file_table, slot_index);
6380 io_rsrc_node_switch(ctx, ctx->file_data);
6386 static int __io_sqe_files_update(struct io_ring_ctx *ctx,
6387 struct io_uring_rsrc_update2 *up,
6390 u64 __user *tags = u64_to_user_ptr(up->tags);
6391 __s32 __user *fds = u64_to_user_ptr(up->data);
6392 struct io_rsrc_data *data = ctx->file_data;
6393 struct io_fixed_file *file_slot;
6397 bool needs_switch = false;
6399 if (!ctx->file_data)
6401 if (up->offset + nr_args > ctx->nr_user_files)
6404 for (done = 0; done < nr_args; done++) {
6407 if ((tags && copy_from_user(&tag, &tags[done], sizeof(tag))) ||
6408 copy_from_user(&fd, &fds[done], sizeof(fd))) {
6412 if ((fd == IORING_REGISTER_FILES_SKIP || fd == -1) && tag) {
6416 if (fd == IORING_REGISTER_FILES_SKIP)
6419 i = array_index_nospec(up->offset + done, ctx->nr_user_files);
6420 file_slot = io_fixed_file_slot(&ctx->file_table, i);
6422 if (file_slot->file_ptr) {
6423 file = (struct file *)(file_slot->file_ptr & FFS_MASK);
6424 err = io_queue_rsrc_removal(data, i, ctx->rsrc_node, file);
6427 file_slot->file_ptr = 0;
6428 io_file_bitmap_clear(&ctx->file_table, i);
6429 needs_switch = true;
6438 * Don't allow io_uring instances to be registered. If
6439 * UNIX isn't enabled, then this causes a reference
6440 * cycle and this instance can never get freed. If UNIX
6441 * is enabled we'll handle it just fine, but there's
6442 * still no point in allowing a ring fd as it doesn't
6443 * support regular read/write anyway.
6445 if (file->f_op == &io_uring_fops) {
6450 err = io_scm_file_account(ctx, file);
6455 *io_get_tag_slot(data, i) = tag;
6456 io_fixed_file_set(file_slot, file);
6457 io_file_bitmap_set(&ctx->file_table, i);
6462 io_rsrc_node_switch(ctx, data);
6463 return done ? done : err;
6466 static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx,
6467 struct task_struct *task)
6469 struct io_wq_hash *hash;
6470 struct io_wq_data data;
6471 unsigned int concurrency;
6473 mutex_lock(&ctx->uring_lock);
6474 hash = ctx->hash_map;
6476 hash = kzalloc(sizeof(*hash), GFP_KERNEL);
6478 mutex_unlock(&ctx->uring_lock);
6479 return ERR_PTR(-ENOMEM);
6481 refcount_set(&hash->refs, 1);
6482 init_waitqueue_head(&hash->wait);
6483 ctx->hash_map = hash;
6485 mutex_unlock(&ctx->uring_lock);
6489 data.free_work = io_wq_free_work;
6490 data.do_work = io_wq_submit_work;
6492 /* Do QD, or 4 * CPUS, whatever is smallest */
6493 concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
6495 return io_wq_create(concurrency, &data);
6498 static __cold int io_uring_alloc_task_context(struct task_struct *task,
6499 struct io_ring_ctx *ctx)
6501 struct io_uring_task *tctx;
6504 tctx = kzalloc(sizeof(*tctx), GFP_KERNEL);
6505 if (unlikely(!tctx))
6508 tctx->registered_rings = kcalloc(IO_RINGFD_REG_MAX,
6509 sizeof(struct file *), GFP_KERNEL);
6510 if (unlikely(!tctx->registered_rings)) {
6515 ret = percpu_counter_init(&tctx->inflight, 0, GFP_KERNEL);
6516 if (unlikely(ret)) {
6517 kfree(tctx->registered_rings);
6522 tctx->io_wq = io_init_wq_offload(ctx, task);
6523 if (IS_ERR(tctx->io_wq)) {
6524 ret = PTR_ERR(tctx->io_wq);
6525 percpu_counter_destroy(&tctx->inflight);
6526 kfree(tctx->registered_rings);
6532 init_waitqueue_head(&tctx->wait);
6533 atomic_set(&tctx->in_idle, 0);
6534 atomic_set(&tctx->inflight_tracked, 0);
6535 task->io_uring = tctx;
6536 spin_lock_init(&tctx->task_lock);
6537 INIT_WQ_LIST(&tctx->task_list);
6538 INIT_WQ_LIST(&tctx->prio_task_list);
6539 init_task_work(&tctx->task_work, tctx_task_work);
6543 void __io_uring_free(struct task_struct *tsk)
6545 struct io_uring_task *tctx = tsk->io_uring;
6547 WARN_ON_ONCE(!xa_empty(&tctx->xa));
6548 WARN_ON_ONCE(tctx->io_wq);
6549 WARN_ON_ONCE(tctx->cached_refs);
6551 kfree(tctx->registered_rings);
6552 percpu_counter_destroy(&tctx->inflight);
6554 tsk->io_uring = NULL;
6557 static __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
6558 struct io_uring_params *p)
6562 /* Retain compatibility with failing for an invalid attach attempt */
6563 if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
6564 IORING_SETUP_ATTACH_WQ) {
6567 f = fdget(p->wq_fd);
6570 if (f.file->f_op != &io_uring_fops) {
6576 if (ctx->flags & IORING_SETUP_SQPOLL) {
6577 struct task_struct *tsk;
6578 struct io_sq_data *sqd;
6581 ret = security_uring_sqpoll();
6585 sqd = io_get_sq_data(p, &attached);
6591 ctx->sq_creds = get_current_cred();
6593 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
6594 if (!ctx->sq_thread_idle)
6595 ctx->sq_thread_idle = HZ;
6597 io_sq_thread_park(sqd);
6598 list_add(&ctx->sqd_list, &sqd->ctx_list);
6599 io_sqd_update_thread_idle(sqd);
6600 /* don't attach to a dying SQPOLL thread, would be racy */
6601 ret = (attached && !sqd->thread) ? -ENXIO : 0;
6602 io_sq_thread_unpark(sqd);
6609 if (p->flags & IORING_SETUP_SQ_AFF) {
6610 int cpu = p->sq_thread_cpu;
6613 if (cpu >= nr_cpu_ids || !cpu_online(cpu))
6620 sqd->task_pid = current->pid;
6621 sqd->task_tgid = current->tgid;
6622 tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
6629 ret = io_uring_alloc_task_context(tsk, ctx);
6630 wake_up_new_task(tsk);
6633 } else if (p->flags & IORING_SETUP_SQ_AFF) {
6634 /* Can't have SQ_AFF without SQPOLL */
6641 complete(&ctx->sq_data->exited);
6643 io_sq_thread_finish(ctx);
6647 static inline void __io_unaccount_mem(struct user_struct *user,
6648 unsigned long nr_pages)
6650 atomic_long_sub(nr_pages, &user->locked_vm);
6653 static inline int __io_account_mem(struct user_struct *user,
6654 unsigned long nr_pages)
6656 unsigned long page_limit, cur_pages, new_pages;
6658 /* Don't allow more pages than we can safely lock */
6659 page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
6662 cur_pages = atomic_long_read(&user->locked_vm);
6663 new_pages = cur_pages + nr_pages;
6664 if (new_pages > page_limit)
6666 } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
6667 new_pages) != cur_pages);
6672 static void io_unaccount_mem(struct io_ring_ctx *ctx, unsigned long nr_pages)
6675 __io_unaccount_mem(ctx->user, nr_pages);
6677 if (ctx->mm_account)
6678 atomic64_sub(nr_pages, &ctx->mm_account->pinned_vm);
6681 static int io_account_mem(struct io_ring_ctx *ctx, unsigned long nr_pages)
6686 ret = __io_account_mem(ctx->user, nr_pages);
6691 if (ctx->mm_account)
6692 atomic64_add(nr_pages, &ctx->mm_account->pinned_vm);
6697 static void io_mem_free(void *ptr)
6704 page = virt_to_head_page(ptr);
6705 if (put_page_testzero(page))
6706 free_compound_page(page);
6709 static void *io_mem_alloc(size_t size)
6711 gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP;
6713 return (void *) __get_free_pages(gfp, get_order(size));
6716 static unsigned long rings_size(struct io_ring_ctx *ctx, unsigned int sq_entries,
6717 unsigned int cq_entries, size_t *sq_offset)
6719 struct io_rings *rings;
6720 size_t off, sq_array_size;
6722 off = struct_size(rings, cqes, cq_entries);
6723 if (off == SIZE_MAX)
6725 if (ctx->flags & IORING_SETUP_CQE32) {
6726 if (check_shl_overflow(off, 1, &off))
6731 off = ALIGN(off, SMP_CACHE_BYTES);
6739 sq_array_size = array_size(sizeof(u32), sq_entries);
6740 if (sq_array_size == SIZE_MAX)
6743 if (check_add_overflow(off, sq_array_size, &off))
6749 static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_mapped_ubuf **slot)
6751 struct io_mapped_ubuf *imu = *slot;
6754 if (imu != ctx->dummy_ubuf) {
6755 for (i = 0; i < imu->nr_bvecs; i++)
6756 unpin_user_page(imu->bvec[i].bv_page);
6757 if (imu->acct_pages)
6758 io_unaccount_mem(ctx, imu->acct_pages);
6764 static void io_rsrc_buf_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc)
6766 io_buffer_unmap(ctx, &prsrc->buf);
6770 static void __io_sqe_buffers_unregister(struct io_ring_ctx *ctx)
6774 for (i = 0; i < ctx->nr_user_bufs; i++)
6775 io_buffer_unmap(ctx, &ctx->user_bufs[i]);
6776 kfree(ctx->user_bufs);
6777 io_rsrc_data_free(ctx->buf_data);
6778 ctx->user_bufs = NULL;
6779 ctx->buf_data = NULL;
6780 ctx->nr_user_bufs = 0;
6783 static int io_sqe_buffers_unregister(struct io_ring_ctx *ctx)
6785 unsigned nr = ctx->nr_user_bufs;
6792 * Quiesce may unlock ->uring_lock, and while it's not held
6793 * prevent new requests using the table.
6795 ctx->nr_user_bufs = 0;
6796 ret = io_rsrc_ref_quiesce(ctx->buf_data, ctx);
6797 ctx->nr_user_bufs = nr;
6799 __io_sqe_buffers_unregister(ctx);
6803 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
6804 void __user *arg, unsigned index)
6806 struct iovec __user *src;
6808 #ifdef CONFIG_COMPAT
6810 struct compat_iovec __user *ciovs;
6811 struct compat_iovec ciov;
6813 ciovs = (struct compat_iovec __user *) arg;
6814 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
6817 dst->iov_base = u64_to_user_ptr((u64)ciov.iov_base);
6818 dst->iov_len = ciov.iov_len;
6822 src = (struct iovec __user *) arg;
6823 if (copy_from_user(dst, &src[index], sizeof(*dst)))
6829 * Not super efficient, but this is just a registration time. And we do cache
6830 * the last compound head, so generally we'll only do a full search if we don't
6833 * We check if the given compound head page has already been accounted, to
6834 * avoid double accounting it. This allows us to account the full size of the
6835 * page, not just the constituent pages of a huge page.
6837 static bool headpage_already_acct(struct io_ring_ctx *ctx, struct page **pages,
6838 int nr_pages, struct page *hpage)
6842 /* check current page array */
6843 for (i = 0; i < nr_pages; i++) {
6844 if (!PageCompound(pages[i]))
6846 if (compound_head(pages[i]) == hpage)
6850 /* check previously registered pages */
6851 for (i = 0; i < ctx->nr_user_bufs; i++) {
6852 struct io_mapped_ubuf *imu = ctx->user_bufs[i];
6854 for (j = 0; j < imu->nr_bvecs; j++) {
6855 if (!PageCompound(imu->bvec[j].bv_page))
6857 if (compound_head(imu->bvec[j].bv_page) == hpage)
6865 static int io_buffer_account_pin(struct io_ring_ctx *ctx, struct page **pages,
6866 int nr_pages, struct io_mapped_ubuf *imu,
6867 struct page **last_hpage)
6871 imu->acct_pages = 0;
6872 for (i = 0; i < nr_pages; i++) {
6873 if (!PageCompound(pages[i])) {
6878 hpage = compound_head(pages[i]);
6879 if (hpage == *last_hpage)
6881 *last_hpage = hpage;
6882 if (headpage_already_acct(ctx, pages, i, hpage))
6884 imu->acct_pages += page_size(hpage) >> PAGE_SHIFT;
6888 if (!imu->acct_pages)
6891 ret = io_account_mem(ctx, imu->acct_pages);
6893 imu->acct_pages = 0;
6897 static struct page **io_pin_pages(unsigned long ubuf, unsigned long len,
6900 unsigned long start, end, nr_pages;
6901 struct vm_area_struct **vmas = NULL;
6902 struct page **pages = NULL;
6903 int i, pret, ret = -ENOMEM;
6905 end = (ubuf + len + PAGE_SIZE - 1) >> PAGE_SHIFT;
6906 start = ubuf >> PAGE_SHIFT;
6907 nr_pages = end - start;
6909 pages = kvmalloc_array(nr_pages, sizeof(struct page *), GFP_KERNEL);
6913 vmas = kvmalloc_array(nr_pages, sizeof(struct vm_area_struct *),
6919 mmap_read_lock(current->mm);
6920 pret = pin_user_pages(ubuf, nr_pages, FOLL_WRITE | FOLL_LONGTERM,
6922 if (pret == nr_pages) {
6923 /* don't support file backed memory */
6924 for (i = 0; i < nr_pages; i++) {
6925 struct vm_area_struct *vma = vmas[i];
6927 if (vma_is_shmem(vma))
6930 !is_file_hugepages(vma->vm_file)) {
6937 ret = pret < 0 ? pret : -EFAULT;
6939 mmap_read_unlock(current->mm);
6942 * if we did partial map, or found file backed vmas,
6943 * release any pages we did get
6946 unpin_user_pages(pages, pret);
6954 pages = ERR_PTR(ret);
6959 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
6960 struct io_mapped_ubuf **pimu,
6961 struct page **last_hpage)
6963 struct io_mapped_ubuf *imu = NULL;
6964 struct page **pages = NULL;
6967 int ret, nr_pages, i;
6969 if (!iov->iov_base) {
6970 *pimu = ctx->dummy_ubuf;
6977 pages = io_pin_pages((unsigned long) iov->iov_base, iov->iov_len,
6979 if (IS_ERR(pages)) {
6980 ret = PTR_ERR(pages);
6985 imu = kvmalloc(struct_size(imu, bvec, nr_pages), GFP_KERNEL);
6989 ret = io_buffer_account_pin(ctx, pages, nr_pages, imu, last_hpage);
6991 unpin_user_pages(pages, nr_pages);
6995 off = (unsigned long) iov->iov_base & ~PAGE_MASK;
6996 size = iov->iov_len;
6997 for (i = 0; i < nr_pages; i++) {
7000 vec_len = min_t(size_t, size, PAGE_SIZE - off);
7001 imu->bvec[i].bv_page = pages[i];
7002 imu->bvec[i].bv_len = vec_len;
7003 imu->bvec[i].bv_offset = off;
7007 /* store original address for later verification */
7008 imu->ubuf = (unsigned long) iov->iov_base;
7009 imu->ubuf_end = imu->ubuf + iov->iov_len;
7010 imu->nr_bvecs = nr_pages;
7020 static int io_buffers_map_alloc(struct io_ring_ctx *ctx, unsigned int nr_args)
7022 ctx->user_bufs = kcalloc(nr_args, sizeof(*ctx->user_bufs), GFP_KERNEL);
7023 return ctx->user_bufs ? 0 : -ENOMEM;
7026 static int io_buffer_validate(struct iovec *iov)
7028 unsigned long tmp, acct_len = iov->iov_len + (PAGE_SIZE - 1);
7031 * Don't impose further limits on the size and buffer
7032 * constraints here, we'll -EINVAL later when IO is
7033 * submitted if they are wrong.
7036 return iov->iov_len ? -EFAULT : 0;
7040 /* arbitrary limit, but we need something */
7041 if (iov->iov_len > SZ_1G)
7044 if (check_add_overflow((unsigned long)iov->iov_base, acct_len, &tmp))
7050 static int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
7051 unsigned int nr_args, u64 __user *tags)
7053 struct page *last_hpage = NULL;
7054 struct io_rsrc_data *data;
7060 if (!nr_args || nr_args > IORING_MAX_REG_BUFFERS)
7062 ret = io_rsrc_node_switch_start(ctx);
7065 ret = io_rsrc_data_alloc(ctx, io_rsrc_buf_put, tags, nr_args, &data);
7068 ret = io_buffers_map_alloc(ctx, nr_args);
7070 io_rsrc_data_free(data);
7074 for (i = 0; i < nr_args; i++, ctx->nr_user_bufs++) {
7076 ret = io_copy_iov(ctx, &iov, arg, i);
7079 ret = io_buffer_validate(&iov);
7083 memset(&iov, 0, sizeof(iov));
7086 if (!iov.iov_base && *io_get_tag_slot(data, i)) {
7091 ret = io_sqe_buffer_register(ctx, &iov, &ctx->user_bufs[i],
7097 WARN_ON_ONCE(ctx->buf_data);
7099 ctx->buf_data = data;
7101 __io_sqe_buffers_unregister(ctx);
7103 io_rsrc_node_switch(ctx, NULL);
7107 static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
7108 struct io_uring_rsrc_update2 *up,
7109 unsigned int nr_args)
7111 u64 __user *tags = u64_to_user_ptr(up->tags);
7112 struct iovec iov, __user *iovs = u64_to_user_ptr(up->data);
7113 struct page *last_hpage = NULL;
7114 bool needs_switch = false;
7120 if (up->offset + nr_args > ctx->nr_user_bufs)
7123 for (done = 0; done < nr_args; done++) {
7124 struct io_mapped_ubuf *imu;
7125 int offset = up->offset + done;
7128 err = io_copy_iov(ctx, &iov, iovs, done);
7131 if (tags && copy_from_user(&tag, &tags[done], sizeof(tag))) {
7135 err = io_buffer_validate(&iov);
7138 if (!iov.iov_base && tag) {
7142 err = io_sqe_buffer_register(ctx, &iov, &imu, &last_hpage);
7146 i = array_index_nospec(offset, ctx->nr_user_bufs);
7147 if (ctx->user_bufs[i] != ctx->dummy_ubuf) {
7148 err = io_queue_rsrc_removal(ctx->buf_data, i,
7149 ctx->rsrc_node, ctx->user_bufs[i]);
7150 if (unlikely(err)) {
7151 io_buffer_unmap(ctx, &imu);
7154 ctx->user_bufs[i] = NULL;
7155 needs_switch = true;
7158 ctx->user_bufs[i] = imu;
7159 *io_get_tag_slot(ctx->buf_data, offset) = tag;
7163 io_rsrc_node_switch(ctx, ctx->buf_data);
7164 return done ? done : err;
7167 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
7168 unsigned int eventfd_async)
7170 struct io_ev_fd *ev_fd;
7171 __s32 __user *fds = arg;
7174 ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
7175 lockdep_is_held(&ctx->uring_lock));
7179 if (copy_from_user(&fd, fds, sizeof(*fds)))
7182 ev_fd = kmalloc(sizeof(*ev_fd), GFP_KERNEL);
7186 ev_fd->cq_ev_fd = eventfd_ctx_fdget(fd);
7187 if (IS_ERR(ev_fd->cq_ev_fd)) {
7188 int ret = PTR_ERR(ev_fd->cq_ev_fd);
7192 ev_fd->eventfd_async = eventfd_async;
7193 ctx->has_evfd = true;
7194 rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
7198 static void io_eventfd_put(struct rcu_head *rcu)
7200 struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
7202 eventfd_ctx_put(ev_fd->cq_ev_fd);
7206 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
7208 struct io_ev_fd *ev_fd;
7210 ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
7211 lockdep_is_held(&ctx->uring_lock));
7213 ctx->has_evfd = false;
7214 rcu_assign_pointer(ctx->io_ev_fd, NULL);
7215 call_rcu(&ev_fd->rcu, io_eventfd_put);
7222 static void io_destroy_buffers(struct io_ring_ctx *ctx)
7224 struct io_buffer_list *bl;
7225 unsigned long index;
7228 for (i = 0; i < BGID_ARRAY; i++) {
7231 __io_remove_buffers(ctx, &ctx->io_bl[i], -1U);
7234 xa_for_each(&ctx->io_bl_xa, index, bl) {
7235 xa_erase(&ctx->io_bl_xa, bl->bgid);
7236 __io_remove_buffers(ctx, bl, -1U);
7240 while (!list_empty(&ctx->io_buffers_pages)) {
7243 page = list_first_entry(&ctx->io_buffers_pages, struct page, lru);
7244 list_del_init(&page->lru);
7249 static void io_req_caches_free(struct io_ring_ctx *ctx)
7251 struct io_submit_state *state = &ctx->submit_state;
7254 mutex_lock(&ctx->uring_lock);
7255 io_flush_cached_locked_reqs(ctx, state);
7257 while (!io_req_cache_empty(ctx)) {
7258 struct io_wq_work_node *node;
7259 struct io_kiocb *req;
7261 node = wq_stack_extract(&state->free_list);
7262 req = container_of(node, struct io_kiocb, comp_list);
7263 kmem_cache_free(req_cachep, req);
7267 percpu_ref_put_many(&ctx->refs, nr);
7268 mutex_unlock(&ctx->uring_lock);
7271 static void io_wait_rsrc_data(struct io_rsrc_data *data)
7273 if (data && !atomic_dec_and_test(&data->refs))
7274 wait_for_completion(&data->done);
7277 static void io_flush_apoll_cache(struct io_ring_ctx *ctx)
7279 struct async_poll *apoll;
7281 while (!list_empty(&ctx->apoll_cache)) {
7282 apoll = list_first_entry(&ctx->apoll_cache, struct async_poll,
7284 list_del(&apoll->poll.wait.entry);
7289 static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
7291 io_sq_thread_finish(ctx);
7293 if (ctx->mm_account) {
7294 mmdrop(ctx->mm_account);
7295 ctx->mm_account = NULL;
7298 io_rsrc_refs_drop(ctx);
7299 /* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
7300 io_wait_rsrc_data(ctx->buf_data);
7301 io_wait_rsrc_data(ctx->file_data);
7303 mutex_lock(&ctx->uring_lock);
7305 __io_sqe_buffers_unregister(ctx);
7307 __io_sqe_files_unregister(ctx);
7309 __io_cqring_overflow_flush(ctx, true);
7310 io_eventfd_unregister(ctx);
7311 io_flush_apoll_cache(ctx);
7312 mutex_unlock(&ctx->uring_lock);
7313 io_destroy_buffers(ctx);
7315 put_cred(ctx->sq_creds);
7317 /* there are no registered resources left, nobody uses it */
7319 io_rsrc_node_destroy(ctx->rsrc_node);
7320 if (ctx->rsrc_backup_node)
7321 io_rsrc_node_destroy(ctx->rsrc_backup_node);
7322 flush_delayed_work(&ctx->rsrc_put_work);
7323 flush_delayed_work(&ctx->fallback_work);
7325 WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
7326 WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist));
7328 #if defined(CONFIG_UNIX)
7329 if (ctx->ring_sock) {
7330 ctx->ring_sock->file = NULL; /* so that iput() is called */
7331 sock_release(ctx->ring_sock);
7334 WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
7336 io_mem_free(ctx->rings);
7337 io_mem_free(ctx->sq_sqes);
7339 percpu_ref_exit(&ctx->refs);
7340 free_uid(ctx->user);
7341 io_req_caches_free(ctx);
7343 io_wq_put_hash(ctx->hash_map);
7344 kfree(ctx->cancel_hash);
7345 kfree(ctx->dummy_ubuf);
7347 xa_destroy(&ctx->io_bl_xa);
7351 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
7353 struct io_ring_ctx *ctx = file->private_data;
7356 poll_wait(file, &ctx->cq_wait, wait);
7358 * synchronizes with barrier from wq_has_sleeper call in
7362 if (!io_sqring_full(ctx))
7363 mask |= EPOLLOUT | EPOLLWRNORM;
7366 * Don't flush cqring overflow list here, just do a simple check.
7367 * Otherwise there could possible be ABBA deadlock:
7370 * lock(&ctx->uring_lock);
7372 * lock(&ctx->uring_lock);
7375 * Users may get EPOLLIN meanwhile seeing nothing in cqring, this
7376 * pushs them to do the flush.
7378 if (io_cqring_events(ctx) ||
7379 test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
7380 mask |= EPOLLIN | EPOLLRDNORM;
7385 static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id)
7387 const struct cred *creds;
7389 creds = xa_erase(&ctx->personalities, id);
7398 struct io_tctx_exit {
7399 struct callback_head task_work;
7400 struct completion completion;
7401 struct io_ring_ctx *ctx;
7404 static __cold void io_tctx_exit_cb(struct callback_head *cb)
7406 struct io_uring_task *tctx = current->io_uring;
7407 struct io_tctx_exit *work;
7409 work = container_of(cb, struct io_tctx_exit, task_work);
7411 * When @in_idle, we're in cancellation and it's racy to remove the
7412 * node. It'll be removed by the end of cancellation, just ignore it.
7414 if (!atomic_read(&tctx->in_idle))
7415 io_uring_del_tctx_node((unsigned long)work->ctx);
7416 complete(&work->completion);
7419 static __cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
7421 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
7423 return req->ctx == data;
7426 static __cold void io_ring_exit_work(struct work_struct *work)
7428 struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
7429 unsigned long timeout = jiffies + HZ * 60 * 5;
7430 unsigned long interval = HZ / 20;
7431 struct io_tctx_exit exit;
7432 struct io_tctx_node *node;
7436 * If we're doing polled IO and end up having requests being
7437 * submitted async (out-of-line), then completions can come in while
7438 * we're waiting for refs to drop. We need to reap these manually,
7439 * as nobody else will be looking for them.
7442 io_uring_try_cancel_requests(ctx, NULL, true);
7444 struct io_sq_data *sqd = ctx->sq_data;
7445 struct task_struct *tsk;
7447 io_sq_thread_park(sqd);
7449 if (tsk && tsk->io_uring && tsk->io_uring->io_wq)
7450 io_wq_cancel_cb(tsk->io_uring->io_wq,
7451 io_cancel_ctx_cb, ctx, true);
7452 io_sq_thread_unpark(sqd);
7455 io_req_caches_free(ctx);
7457 if (WARN_ON_ONCE(time_after(jiffies, timeout))) {
7458 /* there is little hope left, don't run it too often */
7461 } while (!wait_for_completion_timeout(&ctx->ref_comp, interval));
7463 init_completion(&exit.completion);
7464 init_task_work(&exit.task_work, io_tctx_exit_cb);
7467 * Some may use context even when all refs and requests have been put,
7468 * and they are free to do so while still holding uring_lock or
7469 * completion_lock, see io_req_task_submit(). Apart from other work,
7470 * this lock/unlock section also waits them to finish.
7472 mutex_lock(&ctx->uring_lock);
7473 while (!list_empty(&ctx->tctx_list)) {
7474 WARN_ON_ONCE(time_after(jiffies, timeout));
7476 node = list_first_entry(&ctx->tctx_list, struct io_tctx_node,
7478 /* don't spin on a single task if cancellation failed */
7479 list_rotate_left(&ctx->tctx_list);
7480 ret = task_work_add(node->task, &exit.task_work, TWA_SIGNAL);
7481 if (WARN_ON_ONCE(ret))
7484 mutex_unlock(&ctx->uring_lock);
7485 wait_for_completion(&exit.completion);
7486 mutex_lock(&ctx->uring_lock);
7488 mutex_unlock(&ctx->uring_lock);
7489 spin_lock(&ctx->completion_lock);
7490 spin_unlock(&ctx->completion_lock);
7492 io_ring_ctx_free(ctx);
7495 static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
7497 unsigned long index;
7498 struct creds *creds;
7500 mutex_lock(&ctx->uring_lock);
7501 percpu_ref_kill(&ctx->refs);
7503 __io_cqring_overflow_flush(ctx, true);
7504 xa_for_each(&ctx->personalities, index, creds)
7505 io_unregister_personality(ctx, index);
7506 mutex_unlock(&ctx->uring_lock);
7508 /* failed during ring init, it couldn't have issued any requests */
7510 io_kill_timeouts(ctx, NULL, true);
7511 io_poll_remove_all(ctx, NULL, true);
7512 /* if we failed setting up the ctx, we might not have any rings */
7513 io_iopoll_try_reap_events(ctx);
7516 INIT_WORK(&ctx->exit_work, io_ring_exit_work);
7518 * Use system_unbound_wq to avoid spawning tons of event kworkers
7519 * if we're exiting a ton of rings at the same time. It just adds
7520 * noise and overhead, there's no discernable change in runtime
7521 * over using system_wq.
7523 queue_work(system_unbound_wq, &ctx->exit_work);
7526 static int io_uring_release(struct inode *inode, struct file *file)
7528 struct io_ring_ctx *ctx = file->private_data;
7530 file->private_data = NULL;
7531 io_ring_ctx_wait_and_kill(ctx);
7535 struct io_task_cancel {
7536 struct task_struct *task;
7540 static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
7542 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
7543 struct io_task_cancel *cancel = data;
7545 return io_match_task_safe(req, cancel->task, cancel->all);
7548 static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
7549 struct task_struct *task,
7552 struct io_defer_entry *de;
7555 spin_lock(&ctx->completion_lock);
7556 list_for_each_entry_reverse(de, &ctx->defer_list, list) {
7557 if (io_match_task_safe(de->req, task, cancel_all)) {
7558 list_cut_position(&list, &ctx->defer_list, &de->list);
7562 spin_unlock(&ctx->completion_lock);
7563 if (list_empty(&list))
7566 while (!list_empty(&list)) {
7567 de = list_first_entry(&list, struct io_defer_entry, list);
7568 list_del_init(&de->list);
7569 io_req_complete_failed(de->req, -ECANCELED);
7575 static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
7577 struct io_tctx_node *node;
7578 enum io_wq_cancel cret;
7581 mutex_lock(&ctx->uring_lock);
7582 list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
7583 struct io_uring_task *tctx = node->task->io_uring;
7586 * io_wq will stay alive while we hold uring_lock, because it's
7587 * killed after ctx nodes, which requires to take the lock.
7589 if (!tctx || !tctx->io_wq)
7591 cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
7592 ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
7594 mutex_unlock(&ctx->uring_lock);
7599 static __cold void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
7600 struct task_struct *task,
7603 struct io_task_cancel cancel = { .task = task, .all = cancel_all, };
7604 struct io_uring_task *tctx = task ? task->io_uring : NULL;
7606 /* failed during ring init, it couldn't have issued any requests */
7611 enum io_wq_cancel cret;
7615 ret |= io_uring_try_cancel_iowq(ctx);
7616 } else if (tctx && tctx->io_wq) {
7618 * Cancels requests of all rings, not only @ctx, but
7619 * it's fine as the task is in exit/exec.
7621 cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
7623 ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
7626 /* SQPOLL thread does its own polling */
7627 if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
7628 (ctx->sq_data && ctx->sq_data->thread == current)) {
7629 while (!wq_list_empty(&ctx->iopoll_list)) {
7630 io_iopoll_try_reap_events(ctx);
7635 ret |= io_cancel_defer_files(ctx, task, cancel_all);
7636 ret |= io_poll_remove_all(ctx, task, cancel_all);
7637 ret |= io_kill_timeouts(ctx, task, cancel_all);
7639 ret |= io_run_task_work();
7646 static int __io_uring_add_tctx_node(struct io_ring_ctx *ctx)
7648 struct io_uring_task *tctx = current->io_uring;
7649 struct io_tctx_node *node;
7652 if (unlikely(!tctx)) {
7653 ret = io_uring_alloc_task_context(current, ctx);
7657 tctx = current->io_uring;
7658 if (ctx->iowq_limits_set) {
7659 unsigned int limits[2] = { ctx->iowq_limits[0],
7660 ctx->iowq_limits[1], };
7662 ret = io_wq_max_workers(tctx->io_wq, limits);
7667 if (!xa_load(&tctx->xa, (unsigned long)ctx)) {
7668 node = kmalloc(sizeof(*node), GFP_KERNEL);
7672 node->task = current;
7674 ret = xa_err(xa_store(&tctx->xa, (unsigned long)ctx,
7681 mutex_lock(&ctx->uring_lock);
7682 list_add(&node->ctx_node, &ctx->tctx_list);
7683 mutex_unlock(&ctx->uring_lock);
7690 * Note that this task has used io_uring. We use it for cancelation purposes.
7692 static inline int io_uring_add_tctx_node(struct io_ring_ctx *ctx)
7694 struct io_uring_task *tctx = current->io_uring;
7696 if (likely(tctx && tctx->last == ctx))
7698 return __io_uring_add_tctx_node(ctx);
7702 * Remove this io_uring_file -> task mapping.
7704 static __cold void io_uring_del_tctx_node(unsigned long index)
7706 struct io_uring_task *tctx = current->io_uring;
7707 struct io_tctx_node *node;
7711 node = xa_erase(&tctx->xa, index);
7715 WARN_ON_ONCE(current != node->task);
7716 WARN_ON_ONCE(list_empty(&node->ctx_node));
7718 mutex_lock(&node->ctx->uring_lock);
7719 list_del(&node->ctx_node);
7720 mutex_unlock(&node->ctx->uring_lock);
7722 if (tctx->last == node->ctx)
7727 static __cold void io_uring_clean_tctx(struct io_uring_task *tctx)
7729 struct io_wq *wq = tctx->io_wq;
7730 struct io_tctx_node *node;
7731 unsigned long index;
7733 xa_for_each(&tctx->xa, index, node) {
7734 io_uring_del_tctx_node(index);
7739 * Must be after io_uring_del_tctx_node() (removes nodes under
7740 * uring_lock) to avoid race with io_uring_try_cancel_iowq().
7742 io_wq_put_and_exit(wq);
7747 static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked)
7750 return atomic_read(&tctx->inflight_tracked);
7751 return percpu_counter_sum(&tctx->inflight);
7755 * Find any io_uring ctx that this task has registered or done IO on, and cancel
7756 * requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation.
7758 static __cold void io_uring_cancel_generic(bool cancel_all,
7759 struct io_sq_data *sqd)
7761 struct io_uring_task *tctx = current->io_uring;
7762 struct io_ring_ctx *ctx;
7766 WARN_ON_ONCE(sqd && sqd->thread != current);
7768 if (!current->io_uring)
7771 io_wq_exit_start(tctx->io_wq);
7773 atomic_inc(&tctx->in_idle);
7775 io_uring_drop_tctx_refs(current);
7776 /* read completions before cancelations */
7777 inflight = tctx_inflight(tctx, !cancel_all);
7782 struct io_tctx_node *node;
7783 unsigned long index;
7785 xa_for_each(&tctx->xa, index, node) {
7786 /* sqpoll task will cancel all its requests */
7787 if (node->ctx->sq_data)
7789 io_uring_try_cancel_requests(node->ctx, current,
7793 list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
7794 io_uring_try_cancel_requests(ctx, current,
7798 prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
7800 io_uring_drop_tctx_refs(current);
7803 * If we've seen completions, retry without waiting. This
7804 * avoids a race where a completion comes in before we did
7805 * prepare_to_wait().
7807 if (inflight == tctx_inflight(tctx, !cancel_all))
7809 finish_wait(&tctx->wait, &wait);
7812 io_uring_clean_tctx(tctx);
7815 * We shouldn't run task_works after cancel, so just leave
7816 * ->in_idle set for normal exit.
7818 atomic_dec(&tctx->in_idle);
7819 /* for exec all current's requests should be gone, kill tctx */
7820 __io_uring_free(current);
7824 void __io_uring_cancel(bool cancel_all)
7826 io_uring_cancel_generic(cancel_all, NULL);
7829 void io_uring_unreg_ringfd(void)
7831 struct io_uring_task *tctx = current->io_uring;
7834 for (i = 0; i < IO_RINGFD_REG_MAX; i++) {
7835 if (tctx->registered_rings[i]) {
7836 fput(tctx->registered_rings[i]);
7837 tctx->registered_rings[i] = NULL;
7842 static int io_ring_add_registered_fd(struct io_uring_task *tctx, int fd,
7848 for (offset = start; offset < end; offset++) {
7849 offset = array_index_nospec(offset, IO_RINGFD_REG_MAX);
7850 if (tctx->registered_rings[offset])
7856 } else if (file->f_op != &io_uring_fops) {
7860 tctx->registered_rings[offset] = file;
7868 * Register a ring fd to avoid fdget/fdput for each io_uring_enter()
7869 * invocation. User passes in an array of struct io_uring_rsrc_update
7870 * with ->data set to the ring_fd, and ->offset given for the desired
7871 * index. If no index is desired, application may set ->offset == -1U
7872 * and we'll find an available index. Returns number of entries
7873 * successfully processed, or < 0 on error if none were processed.
7875 static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg,
7878 struct io_uring_rsrc_update __user *arg = __arg;
7879 struct io_uring_rsrc_update reg;
7880 struct io_uring_task *tctx;
7883 if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
7886 mutex_unlock(&ctx->uring_lock);
7887 ret = io_uring_add_tctx_node(ctx);
7888 mutex_lock(&ctx->uring_lock);
7892 tctx = current->io_uring;
7893 for (i = 0; i < nr_args; i++) {
7896 if (copy_from_user(®, &arg[i], sizeof(reg))) {
7906 if (reg.offset == -1U) {
7908 end = IO_RINGFD_REG_MAX;
7910 if (reg.offset >= IO_RINGFD_REG_MAX) {
7918 ret = io_ring_add_registered_fd(tctx, reg.data, start, end);
7923 if (copy_to_user(&arg[i], ®, sizeof(reg))) {
7924 fput(tctx->registered_rings[reg.offset]);
7925 tctx->registered_rings[reg.offset] = NULL;
7934 static int io_ringfd_unregister(struct io_ring_ctx *ctx, void __user *__arg,
7937 struct io_uring_rsrc_update __user *arg = __arg;
7938 struct io_uring_task *tctx = current->io_uring;
7939 struct io_uring_rsrc_update reg;
7942 if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
7947 for (i = 0; i < nr_args; i++) {
7948 if (copy_from_user(®, &arg[i], sizeof(reg))) {
7952 if (reg.resv || reg.data || reg.offset >= IO_RINGFD_REG_MAX) {
7957 reg.offset = array_index_nospec(reg.offset, IO_RINGFD_REG_MAX);
7958 if (tctx->registered_rings[reg.offset]) {
7959 fput(tctx->registered_rings[reg.offset]);
7960 tctx->registered_rings[reg.offset] = NULL;
7967 static void *io_uring_validate_mmap_request(struct file *file,
7968 loff_t pgoff, size_t sz)
7970 struct io_ring_ctx *ctx = file->private_data;
7971 loff_t offset = pgoff << PAGE_SHIFT;
7976 case IORING_OFF_SQ_RING:
7977 case IORING_OFF_CQ_RING:
7980 case IORING_OFF_SQES:
7984 return ERR_PTR(-EINVAL);
7987 page = virt_to_head_page(ptr);
7988 if (sz > page_size(page))
7989 return ERR_PTR(-EINVAL);
7996 static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
7998 size_t sz = vma->vm_end - vma->vm_start;
8002 ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
8004 return PTR_ERR(ptr);
8006 pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
8007 return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
8010 #else /* !CONFIG_MMU */
8012 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
8014 return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
8017 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
8019 return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
8022 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
8023 unsigned long addr, unsigned long len,
8024 unsigned long pgoff, unsigned long flags)
8028 ptr = io_uring_validate_mmap_request(file, pgoff, len);
8030 return PTR_ERR(ptr);
8032 return (unsigned long) ptr;
8035 #endif /* !CONFIG_MMU */
8037 static int io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
8042 if (!io_sqring_full(ctx))
8044 prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
8046 if (!io_sqring_full(ctx))
8049 } while (!signal_pending(current));
8051 finish_wait(&ctx->sqo_sq_wait, &wait);
8055 static int io_validate_ext_arg(unsigned flags, const void __user *argp, size_t argsz)
8057 if (flags & IORING_ENTER_EXT_ARG) {
8058 struct io_uring_getevents_arg arg;
8060 if (argsz != sizeof(arg))
8062 if (copy_from_user(&arg, argp, sizeof(arg)))
8068 static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz,
8069 struct __kernel_timespec __user **ts,
8070 const sigset_t __user **sig)
8072 struct io_uring_getevents_arg arg;
8075 * If EXT_ARG isn't set, then we have no timespec and the argp pointer
8076 * is just a pointer to the sigset_t.
8078 if (!(flags & IORING_ENTER_EXT_ARG)) {
8079 *sig = (const sigset_t __user *) argp;
8085 * EXT_ARG is set - ensure we agree on the size of it and copy in our
8086 * timespec and sigset_t pointers if good.
8088 if (*argsz != sizeof(arg))
8090 if (copy_from_user(&arg, argp, sizeof(arg)))
8094 *sig = u64_to_user_ptr(arg.sigmask);
8095 *argsz = arg.sigmask_sz;
8096 *ts = u64_to_user_ptr(arg.ts);
8100 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
8101 u32, min_complete, u32, flags, const void __user *, argp,
8104 struct io_ring_ctx *ctx;
8110 if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
8111 IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
8112 IORING_ENTER_REGISTERED_RING)))
8116 * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
8117 * need only dereference our task private array to find it.
8119 if (flags & IORING_ENTER_REGISTERED_RING) {
8120 struct io_uring_task *tctx = current->io_uring;
8122 if (!tctx || fd >= IO_RINGFD_REG_MAX)
8124 fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
8125 f.file = tctx->registered_rings[fd];
8131 if (unlikely(!f.file))
8135 if (unlikely(f.file->f_op != &io_uring_fops))
8139 ctx = f.file->private_data;
8140 if (unlikely(!percpu_ref_tryget(&ctx->refs)))
8144 if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
8148 * For SQ polling, the thread will do all submissions and completions.
8149 * Just return the requested submit count, and wake the thread if
8153 if (ctx->flags & IORING_SETUP_SQPOLL) {
8154 io_cqring_overflow_flush(ctx);
8156 if (unlikely(ctx->sq_data->thread == NULL)) {
8160 if (flags & IORING_ENTER_SQ_WAKEUP)
8161 wake_up(&ctx->sq_data->wait);
8162 if (flags & IORING_ENTER_SQ_WAIT) {
8163 ret = io_sqpoll_wait_sq(ctx);
8168 } else if (to_submit) {
8169 ret = io_uring_add_tctx_node(ctx);
8173 mutex_lock(&ctx->uring_lock);
8174 ret = io_submit_sqes(ctx, to_submit);
8175 if (ret != to_submit) {
8176 mutex_unlock(&ctx->uring_lock);
8179 if ((flags & IORING_ENTER_GETEVENTS) && ctx->syscall_iopoll)
8181 mutex_unlock(&ctx->uring_lock);
8183 if (flags & IORING_ENTER_GETEVENTS) {
8185 if (ctx->syscall_iopoll) {
8187 * We disallow the app entering submit/complete with
8188 * polling, but we still need to lock the ring to
8189 * prevent racing with polled issue that got punted to
8192 mutex_lock(&ctx->uring_lock);
8194 ret2 = io_validate_ext_arg(flags, argp, argsz);
8195 if (likely(!ret2)) {
8196 min_complete = min(min_complete,
8198 ret2 = io_iopoll_check(ctx, min_complete);
8200 mutex_unlock(&ctx->uring_lock);
8202 const sigset_t __user *sig;
8203 struct __kernel_timespec __user *ts;
8205 ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
8206 if (likely(!ret2)) {
8207 min_complete = min(min_complete,
8209 ret2 = io_cqring_wait(ctx, min_complete, sig,
8218 * EBADR indicates that one or more CQE were dropped.
8219 * Once the user has been informed we can clear the bit
8220 * as they are obviously ok with those drops.
8222 if (unlikely(ret2 == -EBADR))
8223 clear_bit(IO_CHECK_CQ_DROPPED_BIT,
8229 percpu_ref_put(&ctx->refs);
8235 #ifdef CONFIG_PROC_FS
8236 static __cold int io_uring_show_cred(struct seq_file *m, unsigned int id,
8237 const struct cred *cred)
8239 struct user_namespace *uns = seq_user_ns(m);
8240 struct group_info *gi;
8245 seq_printf(m, "%5d\n", id);
8246 seq_put_decimal_ull(m, "\tUid:\t", from_kuid_munged(uns, cred->uid));
8247 seq_put_decimal_ull(m, "\t\t", from_kuid_munged(uns, cred->euid));
8248 seq_put_decimal_ull(m, "\t\t", from_kuid_munged(uns, cred->suid));
8249 seq_put_decimal_ull(m, "\t\t", from_kuid_munged(uns, cred->fsuid));
8250 seq_put_decimal_ull(m, "\n\tGid:\t", from_kgid_munged(uns, cred->gid));
8251 seq_put_decimal_ull(m, "\t\t", from_kgid_munged(uns, cred->egid));
8252 seq_put_decimal_ull(m, "\t\t", from_kgid_munged(uns, cred->sgid));
8253 seq_put_decimal_ull(m, "\t\t", from_kgid_munged(uns, cred->fsgid));
8254 seq_puts(m, "\n\tGroups:\t");
8255 gi = cred->group_info;
8256 for (g = 0; g < gi->ngroups; g++) {
8257 seq_put_decimal_ull(m, g ? " " : "",
8258 from_kgid_munged(uns, gi->gid[g]));
8260 seq_puts(m, "\n\tCapEff:\t");
8261 cap = cred->cap_effective;
8262 CAP_FOR_EACH_U32(__capi)
8263 seq_put_hex_ll(m, NULL, cap.cap[CAP_LAST_U32 - __capi], 8);
8268 static __cold void __io_uring_show_fdinfo(struct io_ring_ctx *ctx,
8271 struct io_sq_data *sq = NULL;
8272 struct io_overflow_cqe *ocqe;
8273 struct io_rings *r = ctx->rings;
8274 unsigned int sq_mask = ctx->sq_entries - 1, cq_mask = ctx->cq_entries - 1;
8275 unsigned int sq_head = READ_ONCE(r->sq.head);
8276 unsigned int sq_tail = READ_ONCE(r->sq.tail);
8277 unsigned int cq_head = READ_ONCE(r->cq.head);
8278 unsigned int cq_tail = READ_ONCE(r->cq.tail);
8279 unsigned int cq_shift = 0;
8280 unsigned int sq_entries, cq_entries;
8282 bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);
8289 * we may get imprecise sqe and cqe info if uring is actively running
8290 * since we get cached_sq_head and cached_cq_tail without uring_lock
8291 * and sq_tail and cq_head are changed by userspace. But it's ok since
8292 * we usually use these info when it is stuck.
8294 seq_printf(m, "SqMask:\t0x%x\n", sq_mask);
8295 seq_printf(m, "SqHead:\t%u\n", sq_head);
8296 seq_printf(m, "SqTail:\t%u\n", sq_tail);
8297 seq_printf(m, "CachedSqHead:\t%u\n", ctx->cached_sq_head);
8298 seq_printf(m, "CqMask:\t0x%x\n", cq_mask);
8299 seq_printf(m, "CqHead:\t%u\n", cq_head);
8300 seq_printf(m, "CqTail:\t%u\n", cq_tail);
8301 seq_printf(m, "CachedCqTail:\t%u\n", ctx->cached_cq_tail);
8302 seq_printf(m, "SQEs:\t%u\n", sq_tail - ctx->cached_sq_head);
8303 sq_entries = min(sq_tail - sq_head, ctx->sq_entries);
8304 for (i = 0; i < sq_entries; i++) {
8305 unsigned int entry = i + sq_head;
8306 unsigned int sq_idx = READ_ONCE(ctx->sq_array[entry & sq_mask]);
8307 struct io_uring_sqe *sqe;
8309 if (sq_idx > sq_mask)
8311 sqe = &ctx->sq_sqes[sq_idx];
8312 seq_printf(m, "%5u: opcode:%d, fd:%d, flags:%x, user_data:%llu\n",
8313 sq_idx, sqe->opcode, sqe->fd, sqe->flags,
8316 seq_printf(m, "CQEs:\t%u\n", cq_tail - cq_head);
8317 cq_entries = min(cq_tail - cq_head, ctx->cq_entries);
8318 for (i = 0; i < cq_entries; i++) {
8319 unsigned int entry = i + cq_head;
8320 struct io_uring_cqe *cqe = &r->cqes[(entry & cq_mask) << cq_shift];
8323 seq_printf(m, "%5u: user_data:%llu, res:%d, flag:%x\n",
8324 entry & cq_mask, cqe->user_data, cqe->res,
8327 seq_printf(m, "%5u: user_data:%llu, res:%d, flag:%x, "
8328 "extra1:%llu, extra2:%llu\n",
8329 entry & cq_mask, cqe->user_data, cqe->res,
8330 cqe->flags, cqe->big_cqe[0], cqe->big_cqe[1]);
8335 * Avoid ABBA deadlock between the seq lock and the io_uring mutex,
8336 * since fdinfo case grabs it in the opposite direction of normal use
8337 * cases. If we fail to get the lock, we just don't iterate any
8338 * structures that could be going away outside the io_uring mutex.
8340 has_lock = mutex_trylock(&ctx->uring_lock);
8342 if (has_lock && (ctx->flags & IORING_SETUP_SQPOLL)) {
8348 seq_printf(m, "SqThread:\t%d\n", sq ? task_pid_nr(sq->thread) : -1);
8349 seq_printf(m, "SqThreadCpu:\t%d\n", sq ? task_cpu(sq->thread) : -1);
8350 seq_printf(m, "UserFiles:\t%u\n", ctx->nr_user_files);
8351 for (i = 0; has_lock && i < ctx->nr_user_files; i++) {
8352 struct file *f = io_file_from_index(ctx, i);
8355 seq_printf(m, "%5u: %s\n", i, file_dentry(f)->d_iname);
8357 seq_printf(m, "%5u: <none>\n", i);
8359 seq_printf(m, "UserBufs:\t%u\n", ctx->nr_user_bufs);
8360 for (i = 0; has_lock && i < ctx->nr_user_bufs; i++) {
8361 struct io_mapped_ubuf *buf = ctx->user_bufs[i];
8362 unsigned int len = buf->ubuf_end - buf->ubuf;
8364 seq_printf(m, "%5u: 0x%llx/%u\n", i, buf->ubuf, len);
8366 if (has_lock && !xa_empty(&ctx->personalities)) {
8367 unsigned long index;
8368 const struct cred *cred;
8370 seq_printf(m, "Personalities:\n");
8371 xa_for_each(&ctx->personalities, index, cred)
8372 io_uring_show_cred(m, index, cred);
8375 mutex_unlock(&ctx->uring_lock);
8377 seq_puts(m, "PollList:\n");
8378 spin_lock(&ctx->completion_lock);
8379 for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
8380 struct hlist_head *list = &ctx->cancel_hash[i];
8381 struct io_kiocb *req;
8383 hlist_for_each_entry(req, list, hash_node)
8384 seq_printf(m, " op=%d, task_works=%d\n", req->opcode,
8385 task_work_pending(req->task));
8388 seq_puts(m, "CqOverflowList:\n");
8389 list_for_each_entry(ocqe, &ctx->cq_overflow_list, list) {
8390 struct io_uring_cqe *cqe = &ocqe->cqe;
8392 seq_printf(m, " user_data=%llu, res=%d, flags=%x\n",
8393 cqe->user_data, cqe->res, cqe->flags);
8397 spin_unlock(&ctx->completion_lock);
8400 static __cold void io_uring_show_fdinfo(struct seq_file *m, struct file *f)
8402 struct io_ring_ctx *ctx = f->private_data;
8404 if (percpu_ref_tryget(&ctx->refs)) {
8405 __io_uring_show_fdinfo(ctx, m);
8406 percpu_ref_put(&ctx->refs);
8411 static const struct file_operations io_uring_fops = {
8412 .release = io_uring_release,
8413 .mmap = io_uring_mmap,
8415 .get_unmapped_area = io_uring_nommu_get_unmapped_area,
8416 .mmap_capabilities = io_uring_nommu_mmap_capabilities,
8418 .poll = io_uring_poll,
8419 #ifdef CONFIG_PROC_FS
8420 .show_fdinfo = io_uring_show_fdinfo,
8424 static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
8425 struct io_uring_params *p)
8427 struct io_rings *rings;
8428 size_t size, sq_array_offset;
8430 /* make sure these are sane, as we already accounted them */
8431 ctx->sq_entries = p->sq_entries;
8432 ctx->cq_entries = p->cq_entries;
8434 size = rings_size(ctx, p->sq_entries, p->cq_entries, &sq_array_offset);
8435 if (size == SIZE_MAX)
8438 rings = io_mem_alloc(size);
8443 ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
8444 rings->sq_ring_mask = p->sq_entries - 1;
8445 rings->cq_ring_mask = p->cq_entries - 1;
8446 rings->sq_ring_entries = p->sq_entries;
8447 rings->cq_ring_entries = p->cq_entries;
8449 if (p->flags & IORING_SETUP_SQE128)
8450 size = array_size(2 * sizeof(struct io_uring_sqe), p->sq_entries);
8452 size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
8453 if (size == SIZE_MAX) {
8454 io_mem_free(ctx->rings);
8459 ctx->sq_sqes = io_mem_alloc(size);
8460 if (!ctx->sq_sqes) {
8461 io_mem_free(ctx->rings);
8469 static int io_uring_install_fd(struct io_ring_ctx *ctx, struct file *file)
8473 fd = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
8477 ret = io_uring_add_tctx_node(ctx);
8482 fd_install(fd, file);
8487 * Allocate an anonymous fd, this is what constitutes the application
8488 * visible backing of an io_uring instance. The application mmaps this
8489 * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
8490 * we have to tie this fd to a socket for file garbage collection purposes.
8492 static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
8495 #if defined(CONFIG_UNIX)
8498 ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
8501 return ERR_PTR(ret);
8504 file = anon_inode_getfile_secure("[io_uring]", &io_uring_fops, ctx,
8505 O_RDWR | O_CLOEXEC, NULL);
8506 #if defined(CONFIG_UNIX)
8508 sock_release(ctx->ring_sock);
8509 ctx->ring_sock = NULL;
8511 ctx->ring_sock->file = file;
8517 static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
8518 struct io_uring_params __user *params)
8520 struct io_ring_ctx *ctx;
8526 if (entries > IORING_MAX_ENTRIES) {
8527 if (!(p->flags & IORING_SETUP_CLAMP))
8529 entries = IORING_MAX_ENTRIES;
8533 * Use twice as many entries for the CQ ring. It's possible for the
8534 * application to drive a higher depth than the size of the SQ ring,
8535 * since the sqes are only used at submission time. This allows for
8536 * some flexibility in overcommitting a bit. If the application has
8537 * set IORING_SETUP_CQSIZE, it will have passed in the desired number
8538 * of CQ ring entries manually.
8540 p->sq_entries = roundup_pow_of_two(entries);
8541 if (p->flags & IORING_SETUP_CQSIZE) {
8543 * If IORING_SETUP_CQSIZE is set, we do the same roundup
8544 * to a power-of-two, if it isn't already. We do NOT impose
8545 * any cq vs sq ring sizing.
8549 if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
8550 if (!(p->flags & IORING_SETUP_CLAMP))
8552 p->cq_entries = IORING_MAX_CQ_ENTRIES;
8554 p->cq_entries = roundup_pow_of_two(p->cq_entries);
8555 if (p->cq_entries < p->sq_entries)
8558 p->cq_entries = 2 * p->sq_entries;
8561 ctx = io_ring_ctx_alloc(p);
8566 * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
8567 * space applications don't need to do io completion events
8568 * polling again, they can rely on io_sq_thread to do polling
8569 * work, which can reduce cpu usage and uring_lock contention.
8571 if (ctx->flags & IORING_SETUP_IOPOLL &&
8572 !(ctx->flags & IORING_SETUP_SQPOLL))
8573 ctx->syscall_iopoll = 1;
8575 ctx->compat = in_compat_syscall();
8576 if (!capable(CAP_IPC_LOCK))
8577 ctx->user = get_uid(current_user());
8580 * For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
8581 * COOP_TASKRUN is set, then IPIs are never needed by the app.
8584 if (ctx->flags & IORING_SETUP_SQPOLL) {
8585 /* IPI related flags don't make sense with SQPOLL */
8586 if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
8587 IORING_SETUP_TASKRUN_FLAG))
8589 ctx->notify_method = TWA_SIGNAL_NO_IPI;
8590 } else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
8591 ctx->notify_method = TWA_SIGNAL_NO_IPI;
8593 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
8595 ctx->notify_method = TWA_SIGNAL;
8599 * This is just grabbed for accounting purposes. When a process exits,
8600 * the mm is exited and dropped before the files, hence we need to hang
8601 * on to this mm purely for the purposes of being able to unaccount
8602 * memory (locked/pinned vm). It's not used for anything else.
8604 mmgrab(current->mm);
8605 ctx->mm_account = current->mm;
8607 ret = io_allocate_scq_urings(ctx, p);
8611 ret = io_sq_offload_create(ctx, p);
8614 /* always set a rsrc node */
8615 ret = io_rsrc_node_switch_start(ctx);
8618 io_rsrc_node_switch(ctx, NULL);
8620 memset(&p->sq_off, 0, sizeof(p->sq_off));
8621 p->sq_off.head = offsetof(struct io_rings, sq.head);
8622 p->sq_off.tail = offsetof(struct io_rings, sq.tail);
8623 p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
8624 p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
8625 p->sq_off.flags = offsetof(struct io_rings, sq_flags);
8626 p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
8627 p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
8629 memset(&p->cq_off, 0, sizeof(p->cq_off));
8630 p->cq_off.head = offsetof(struct io_rings, cq.head);
8631 p->cq_off.tail = offsetof(struct io_rings, cq.tail);
8632 p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
8633 p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
8634 p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
8635 p->cq_off.cqes = offsetof(struct io_rings, cqes);
8636 p->cq_off.flags = offsetof(struct io_rings, cq_flags);
8638 p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
8639 IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
8640 IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
8641 IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
8642 IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
8643 IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
8644 IORING_FEAT_LINKED_FILE;
8646 if (copy_to_user(params, p, sizeof(*p))) {
8651 file = io_uring_get_file(ctx);
8653 ret = PTR_ERR(file);
8658 * Install ring fd as the very last thing, so we don't risk someone
8659 * having closed it before we finish setup
8661 ret = io_uring_install_fd(ctx, file);
8663 /* fput will clean it up */
8668 trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
8671 io_ring_ctx_wait_and_kill(ctx);
8676 * Sets up an aio uring context, and returns the fd. Applications asks for a
8677 * ring size, we return the actual sq/cq ring sizes (among other things) in the
8678 * params structure passed in.
8680 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
8682 struct io_uring_params p;
8685 if (copy_from_user(&p, params, sizeof(p)))
8687 for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
8692 if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
8693 IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
8694 IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
8695 IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
8696 IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
8697 IORING_SETUP_SQE128 | IORING_SETUP_CQE32))
8700 return io_uring_create(entries, &p, params);
8703 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
8704 struct io_uring_params __user *, params)
8706 return io_uring_setup(entries, params);
8709 static __cold int io_probe(struct io_ring_ctx *ctx, void __user *arg,
8712 struct io_uring_probe *p;
8716 size = struct_size(p, ops, nr_args);
8717 if (size == SIZE_MAX)
8719 p = kzalloc(size, GFP_KERNEL);
8724 if (copy_from_user(p, arg, size))
8727 if (memchr_inv(p, 0, size))
8730 p->last_op = IORING_OP_LAST - 1;
8731 if (nr_args > IORING_OP_LAST)
8732 nr_args = IORING_OP_LAST;
8734 for (i = 0; i < nr_args; i++) {
8736 if (!io_op_defs[i].not_supported)
8737 p->ops[i].flags = IO_URING_OP_SUPPORTED;
8742 if (copy_to_user(arg, p, size))
8749 static int io_register_personality(struct io_ring_ctx *ctx)
8751 const struct cred *creds;
8755 creds = get_current_cred();
8757 ret = xa_alloc_cyclic(&ctx->personalities, &id, (void *)creds,
8758 XA_LIMIT(0, USHRT_MAX), &ctx->pers_next, GFP_KERNEL);
8766 static __cold int io_register_restrictions(struct io_ring_ctx *ctx,
8767 void __user *arg, unsigned int nr_args)
8769 struct io_uring_restriction *res;
8773 /* Restrictions allowed only if rings started disabled */
8774 if (!(ctx->flags & IORING_SETUP_R_DISABLED))
8777 /* We allow only a single restrictions registration */
8778 if (ctx->restrictions.registered)
8781 if (!arg || nr_args > IORING_MAX_RESTRICTIONS)
8784 size = array_size(nr_args, sizeof(*res));
8785 if (size == SIZE_MAX)
8788 res = memdup_user(arg, size);
8790 return PTR_ERR(res);
8794 for (i = 0; i < nr_args; i++) {
8795 switch (res[i].opcode) {
8796 case IORING_RESTRICTION_REGISTER_OP:
8797 if (res[i].register_op >= IORING_REGISTER_LAST) {
8802 __set_bit(res[i].register_op,
8803 ctx->restrictions.register_op);
8805 case IORING_RESTRICTION_SQE_OP:
8806 if (res[i].sqe_op >= IORING_OP_LAST) {
8811 __set_bit(res[i].sqe_op, ctx->restrictions.sqe_op);
8813 case IORING_RESTRICTION_SQE_FLAGS_ALLOWED:
8814 ctx->restrictions.sqe_flags_allowed = res[i].sqe_flags;
8816 case IORING_RESTRICTION_SQE_FLAGS_REQUIRED:
8817 ctx->restrictions.sqe_flags_required = res[i].sqe_flags;
8826 /* Reset all restrictions if an error happened */
8828 memset(&ctx->restrictions, 0, sizeof(ctx->restrictions));
8830 ctx->restrictions.registered = true;
8836 static int io_register_enable_rings(struct io_ring_ctx *ctx)
8838 if (!(ctx->flags & IORING_SETUP_R_DISABLED))
8841 if (ctx->restrictions.registered)
8842 ctx->restricted = 1;
8844 ctx->flags &= ~IORING_SETUP_R_DISABLED;
8845 if (ctx->sq_data && wq_has_sleeper(&ctx->sq_data->wait))
8846 wake_up(&ctx->sq_data->wait);
8850 static int __io_register_rsrc_update(struct io_ring_ctx *ctx, unsigned type,
8851 struct io_uring_rsrc_update2 *up,
8857 if (check_add_overflow(up->offset, nr_args, &tmp))
8859 err = io_rsrc_node_switch_start(ctx);
8864 case IORING_RSRC_FILE:
8865 return __io_sqe_files_update(ctx, up, nr_args);
8866 case IORING_RSRC_BUFFER:
8867 return __io_sqe_buffers_update(ctx, up, nr_args);
8872 static int io_register_files_update(struct io_ring_ctx *ctx, void __user *arg,
8875 struct io_uring_rsrc_update2 up;
8879 memset(&up, 0, sizeof(up));
8880 if (copy_from_user(&up, arg, sizeof(struct io_uring_rsrc_update)))
8882 if (up.resv || up.resv2)
8884 return __io_register_rsrc_update(ctx, IORING_RSRC_FILE, &up, nr_args);
8887 static int io_register_rsrc_update(struct io_ring_ctx *ctx, void __user *arg,
8888 unsigned size, unsigned type)
8890 struct io_uring_rsrc_update2 up;
8892 if (size != sizeof(up))
8894 if (copy_from_user(&up, arg, sizeof(up)))
8896 if (!up.nr || up.resv || up.resv2)
8898 return __io_register_rsrc_update(ctx, type, &up, up.nr);
8901 static __cold int io_register_rsrc(struct io_ring_ctx *ctx, void __user *arg,
8902 unsigned int size, unsigned int type)
8904 struct io_uring_rsrc_register rr;
8906 /* keep it extendible */
8907 if (size != sizeof(rr))
8910 memset(&rr, 0, sizeof(rr));
8911 if (copy_from_user(&rr, arg, size))
8913 if (!rr.nr || rr.resv2)
8915 if (rr.flags & ~IORING_RSRC_REGISTER_SPARSE)
8919 case IORING_RSRC_FILE:
8920 if (rr.flags & IORING_RSRC_REGISTER_SPARSE && rr.data)
8922 return io_sqe_files_register(ctx, u64_to_user_ptr(rr.data),
8923 rr.nr, u64_to_user_ptr(rr.tags));
8924 case IORING_RSRC_BUFFER:
8925 if (rr.flags & IORING_RSRC_REGISTER_SPARSE && rr.data)
8927 return io_sqe_buffers_register(ctx, u64_to_user_ptr(rr.data),
8928 rr.nr, u64_to_user_ptr(rr.tags));
8933 static __cold int io_register_iowq_aff(struct io_ring_ctx *ctx,
8934 void __user *arg, unsigned len)
8936 struct io_uring_task *tctx = current->io_uring;
8937 cpumask_var_t new_mask;
8940 if (!tctx || !tctx->io_wq)
8943 if (!alloc_cpumask_var(&new_mask, GFP_KERNEL))
8946 cpumask_clear(new_mask);
8947 if (len > cpumask_size())
8948 len = cpumask_size();
8950 if (in_compat_syscall()) {
8951 ret = compat_get_bitmap(cpumask_bits(new_mask),
8952 (const compat_ulong_t __user *)arg,
8953 len * 8 /* CHAR_BIT */);
8955 ret = copy_from_user(new_mask, arg, len);
8959 free_cpumask_var(new_mask);
8963 ret = io_wq_cpu_affinity(tctx->io_wq, new_mask);
8964 free_cpumask_var(new_mask);
8968 static __cold int io_unregister_iowq_aff(struct io_ring_ctx *ctx)
8970 struct io_uring_task *tctx = current->io_uring;
8972 if (!tctx || !tctx->io_wq)
8975 return io_wq_cpu_affinity(tctx->io_wq, NULL);
8978 static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
8980 __must_hold(&ctx->uring_lock)
8982 struct io_tctx_node *node;
8983 struct io_uring_task *tctx = NULL;
8984 struct io_sq_data *sqd = NULL;
8988 if (copy_from_user(new_count, arg, sizeof(new_count)))
8990 for (i = 0; i < ARRAY_SIZE(new_count); i++)
8991 if (new_count[i] > INT_MAX)
8994 if (ctx->flags & IORING_SETUP_SQPOLL) {
8998 * Observe the correct sqd->lock -> ctx->uring_lock
8999 * ordering. Fine to drop uring_lock here, we hold
9002 refcount_inc(&sqd->refs);
9003 mutex_unlock(&ctx->uring_lock);
9004 mutex_lock(&sqd->lock);
9005 mutex_lock(&ctx->uring_lock);
9007 tctx = sqd->thread->io_uring;
9010 tctx = current->io_uring;
9013 BUILD_BUG_ON(sizeof(new_count) != sizeof(ctx->iowq_limits));
9015 for (i = 0; i < ARRAY_SIZE(new_count); i++)
9017 ctx->iowq_limits[i] = new_count[i];
9018 ctx->iowq_limits_set = true;
9020 if (tctx && tctx->io_wq) {
9021 ret = io_wq_max_workers(tctx->io_wq, new_count);
9025 memset(new_count, 0, sizeof(new_count));
9029 mutex_unlock(&sqd->lock);
9030 io_put_sq_data(sqd);
9033 if (copy_to_user(arg, new_count, sizeof(new_count)))
9036 /* that's it for SQPOLL, only the SQPOLL task creates requests */
9040 /* now propagate the restriction to all registered users */
9041 list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
9042 struct io_uring_task *tctx = node->task->io_uring;
9044 if (WARN_ON_ONCE(!tctx->io_wq))
9047 for (i = 0; i < ARRAY_SIZE(new_count); i++)
9048 new_count[i] = ctx->iowq_limits[i];
9049 /* ignore errors, it always returns zero anyway */
9050 (void)io_wq_max_workers(tctx->io_wq, new_count);
9055 mutex_unlock(&sqd->lock);
9056 io_put_sq_data(sqd);
9061 static int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
9063 struct io_uring_buf_ring *br;
9064 struct io_uring_buf_reg reg;
9065 struct io_buffer_list *bl, *free_bl = NULL;
9066 struct page **pages;
9069 if (copy_from_user(®, arg, sizeof(reg)))
9072 if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2])
9076 if (reg.ring_addr & ~PAGE_MASK)
9078 if (!is_power_of_2(reg.ring_entries))
9081 /* cannot disambiguate full vs empty due to head/tail size */
9082 if (reg.ring_entries >= 65536)
9085 if (unlikely(reg.bgid < BGID_ARRAY && !ctx->io_bl)) {
9086 int ret = io_init_bl_list(ctx);
9091 bl = io_buffer_get_list(ctx, reg.bgid);
9093 /* if mapped buffer ring OR classic exists, don't allow */
9094 if (bl->buf_nr_pages || !list_empty(&bl->buf_list))
9097 free_bl = bl = kzalloc(sizeof(*bl), GFP_KERNEL);
9102 pages = io_pin_pages(reg.ring_addr,
9103 struct_size(br, bufs, reg.ring_entries),
9105 if (IS_ERR(pages)) {
9107 return PTR_ERR(pages);
9110 br = page_address(pages[0]);
9111 bl->buf_pages = pages;
9112 bl->buf_nr_pages = nr_pages;
9113 bl->nr_entries = reg.ring_entries;
9115 bl->mask = reg.ring_entries - 1;
9116 io_buffer_add_list(ctx, bl, reg.bgid);
9120 static int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
9122 struct io_uring_buf_reg reg;
9123 struct io_buffer_list *bl;
9125 if (copy_from_user(®, arg, sizeof(reg)))
9127 if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2])
9130 bl = io_buffer_get_list(ctx, reg.bgid);
9133 if (!bl->buf_nr_pages)
9136 __io_remove_buffers(ctx, bl, -1U);
9137 if (bl->bgid >= BGID_ARRAY) {
9138 xa_erase(&ctx->io_bl_xa, bl->bgid);
9144 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
9145 void __user *arg, unsigned nr_args)
9146 __releases(ctx->uring_lock)
9147 __acquires(ctx->uring_lock)
9152 * We're inside the ring mutex, if the ref is already dying, then
9153 * someone else killed the ctx or is already going through
9154 * io_uring_register().
9156 if (percpu_ref_is_dying(&ctx->refs))
9159 if (ctx->restricted) {
9160 if (opcode >= IORING_REGISTER_LAST)
9162 opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
9163 if (!test_bit(opcode, ctx->restrictions.register_op))
9168 case IORING_REGISTER_BUFFERS:
9172 ret = io_sqe_buffers_register(ctx, arg, nr_args, NULL);
9174 case IORING_UNREGISTER_BUFFERS:
9178 ret = io_sqe_buffers_unregister(ctx);
9180 case IORING_REGISTER_FILES:
9184 ret = io_sqe_files_register(ctx, arg, nr_args, NULL);
9186 case IORING_UNREGISTER_FILES:
9190 ret = io_sqe_files_unregister(ctx);
9192 case IORING_REGISTER_FILES_UPDATE:
9193 ret = io_register_files_update(ctx, arg, nr_args);
9195 case IORING_REGISTER_EVENTFD:
9199 ret = io_eventfd_register(ctx, arg, 0);
9201 case IORING_REGISTER_EVENTFD_ASYNC:
9205 ret = io_eventfd_register(ctx, arg, 1);
9207 case IORING_UNREGISTER_EVENTFD:
9211 ret = io_eventfd_unregister(ctx);
9213 case IORING_REGISTER_PROBE:
9215 if (!arg || nr_args > 256)
9217 ret = io_probe(ctx, arg, nr_args);
9219 case IORING_REGISTER_PERSONALITY:
9223 ret = io_register_personality(ctx);
9225 case IORING_UNREGISTER_PERSONALITY:
9229 ret = io_unregister_personality(ctx, nr_args);
9231 case IORING_REGISTER_ENABLE_RINGS:
9235 ret = io_register_enable_rings(ctx);
9237 case IORING_REGISTER_RESTRICTIONS:
9238 ret = io_register_restrictions(ctx, arg, nr_args);
9240 case IORING_REGISTER_FILES2:
9241 ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_FILE);
9243 case IORING_REGISTER_FILES_UPDATE2:
9244 ret = io_register_rsrc_update(ctx, arg, nr_args,
9247 case IORING_REGISTER_BUFFERS2:
9248 ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_BUFFER);
9250 case IORING_REGISTER_BUFFERS_UPDATE:
9251 ret = io_register_rsrc_update(ctx, arg, nr_args,
9252 IORING_RSRC_BUFFER);
9254 case IORING_REGISTER_IOWQ_AFF:
9256 if (!arg || !nr_args)
9258 ret = io_register_iowq_aff(ctx, arg, nr_args);
9260 case IORING_UNREGISTER_IOWQ_AFF:
9264 ret = io_unregister_iowq_aff(ctx);
9266 case IORING_REGISTER_IOWQ_MAX_WORKERS:
9268 if (!arg || nr_args != 2)
9270 ret = io_register_iowq_max_workers(ctx, arg);
9272 case IORING_REGISTER_RING_FDS:
9273 ret = io_ringfd_register(ctx, arg, nr_args);
9275 case IORING_UNREGISTER_RING_FDS:
9276 ret = io_ringfd_unregister(ctx, arg, nr_args);
9278 case IORING_REGISTER_PBUF_RING:
9280 if (!arg || nr_args != 1)
9282 ret = io_register_pbuf_ring(ctx, arg);
9284 case IORING_UNREGISTER_PBUF_RING:
9286 if (!arg || nr_args != 1)
9288 ret = io_unregister_pbuf_ring(ctx, arg);
9298 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
9299 void __user *, arg, unsigned int, nr_args)
9301 struct io_ring_ctx *ctx;
9310 if (f.file->f_op != &io_uring_fops)
9313 ctx = f.file->private_data;
9317 mutex_lock(&ctx->uring_lock);
9318 ret = __io_uring_register(ctx, opcode, arg, nr_args);
9319 mutex_unlock(&ctx->uring_lock);
9320 trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret);
9326 static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
9332 static const struct io_op_def io_op_defs[] = {
9336 .prep = io_nop_prep,
9339 [IORING_OP_READV] = {
9341 .unbound_nonreg_file = 1,
9348 .async_size = sizeof(struct io_async_rw),
9351 .prep_async = io_readv_prep_async,
9352 .cleanup = io_readv_writev_cleanup,
9354 [IORING_OP_WRITEV] = {
9357 .unbound_nonreg_file = 1,
9363 .async_size = sizeof(struct io_async_rw),
9366 .prep_async = io_writev_prep_async,
9367 .cleanup = io_readv_writev_cleanup,
9369 [IORING_OP_FSYNC] = {
9372 .prep = io_fsync_prep,
9375 [IORING_OP_READ_FIXED] = {
9377 .unbound_nonreg_file = 1,
9383 .async_size = sizeof(struct io_async_rw),
9387 [IORING_OP_WRITE_FIXED] = {
9390 .unbound_nonreg_file = 1,
9396 .async_size = sizeof(struct io_async_rw),
9400 [IORING_OP_POLL_ADD] = {
9402 .unbound_nonreg_file = 1,
9404 .prep = io_poll_add_prep,
9405 .issue = io_poll_add,
9407 [IORING_OP_POLL_REMOVE] = {
9409 .prep = io_poll_remove_prep,
9410 .issue = io_poll_remove,
9412 [IORING_OP_SYNC_FILE_RANGE] = {
9415 .prep = io_sfr_prep,
9416 .issue = io_sync_file_range,
9418 [IORING_OP_SENDMSG] = {
9420 .unbound_nonreg_file = 1,
9423 #if defined(CONFIG_NET)
9424 .async_size = sizeof(struct io_async_msghdr),
9425 .prep = io_sendmsg_prep,
9426 .issue = io_sendmsg,
9427 .prep_async = io_sendmsg_prep_async,
9428 .cleanup = io_sendmsg_recvmsg_cleanup,
9430 .prep = io_eopnotsupp_prep,
9433 [IORING_OP_RECVMSG] = {
9435 .unbound_nonreg_file = 1,
9439 #if defined(CONFIG_NET)
9440 .async_size = sizeof(struct io_async_msghdr),
9441 .prep = io_recvmsg_prep,
9442 .issue = io_recvmsg,
9443 .prep_async = io_recvmsg_prep_async,
9444 .cleanup = io_sendmsg_recvmsg_cleanup,
9446 .prep = io_eopnotsupp_prep,
9449 [IORING_OP_TIMEOUT] = {
9451 .async_size = sizeof(struct io_timeout_data),
9452 .prep = io_timeout_prep,
9453 .issue = io_timeout,
9455 [IORING_OP_TIMEOUT_REMOVE] = {
9456 /* used by timeout updates' prep() */
9458 .prep = io_timeout_remove_prep,
9459 .issue = io_timeout_remove,
9461 [IORING_OP_ACCEPT] = {
9463 .unbound_nonreg_file = 1,
9465 .poll_exclusive = 1,
9466 .ioprio = 1, /* used for flags */
9467 #if defined(CONFIG_NET)
9468 .prep = io_accept_prep,
9471 .prep = io_eopnotsupp_prep,
9474 [IORING_OP_ASYNC_CANCEL] = {
9476 .prep = io_async_cancel_prep,
9477 .issue = io_async_cancel,
9479 [IORING_OP_LINK_TIMEOUT] = {
9481 .async_size = sizeof(struct io_timeout_data),
9482 .prep = io_link_timeout_prep,
9483 .issue = io_no_issue,
9485 [IORING_OP_CONNECT] = {
9487 .unbound_nonreg_file = 1,
9489 #if defined(CONFIG_NET)
9490 .async_size = sizeof(struct io_async_connect),
9491 .prep = io_connect_prep,
9492 .issue = io_connect,
9493 .prep_async = io_connect_prep_async,
9495 .prep = io_eopnotsupp_prep,
9498 [IORING_OP_FALLOCATE] = {
9500 .prep = io_fallocate_prep,
9501 .issue = io_fallocate,
9503 [IORING_OP_OPENAT] = {
9504 .prep = io_openat_prep,
9506 .cleanup = io_open_cleanup,
9508 [IORING_OP_CLOSE] = {
9509 .prep = io_close_prep,
9512 [IORING_OP_FILES_UPDATE] = {
9515 .prep = io_files_update_prep,
9516 .issue = io_files_update,
9518 [IORING_OP_STATX] = {
9520 .prep = io_statx_prep,
9522 .cleanup = io_statx_cleanup,
9524 [IORING_OP_READ] = {
9526 .unbound_nonreg_file = 1,
9533 .async_size = sizeof(struct io_async_rw),
9537 [IORING_OP_WRITE] = {
9540 .unbound_nonreg_file = 1,
9546 .async_size = sizeof(struct io_async_rw),
9550 [IORING_OP_FADVISE] = {
9553 .prep = io_fadvise_prep,
9554 .issue = io_fadvise,
9556 [IORING_OP_MADVISE] = {
9557 .prep = io_madvise_prep,
9558 .issue = io_madvise,
9560 [IORING_OP_SEND] = {
9562 .unbound_nonreg_file = 1,
9566 #if defined(CONFIG_NET)
9567 .prep = io_sendmsg_prep,
9570 .prep = io_eopnotsupp_prep,
9573 [IORING_OP_RECV] = {
9575 .unbound_nonreg_file = 1,
9580 #if defined(CONFIG_NET)
9581 .prep = io_recvmsg_prep,
9584 .prep = io_eopnotsupp_prep,
9587 [IORING_OP_OPENAT2] = {
9588 .prep = io_openat2_prep,
9589 .issue = io_openat2,
9590 .cleanup = io_open_cleanup,
9592 [IORING_OP_EPOLL_CTL] = {
9593 .unbound_nonreg_file = 1,
9595 #if defined(CONFIG_EPOLL)
9596 .prep = io_epoll_ctl_prep,
9597 .issue = io_epoll_ctl,
9599 .prep = io_eopnotsupp_prep,
9602 [IORING_OP_SPLICE] = {
9605 .unbound_nonreg_file = 1,
9607 .prep = io_splice_prep,
9610 [IORING_OP_PROVIDE_BUFFERS] = {
9613 .prep = io_provide_buffers_prep,
9614 .issue = io_provide_buffers,
9616 [IORING_OP_REMOVE_BUFFERS] = {
9619 .prep = io_remove_buffers_prep,
9620 .issue = io_remove_buffers,
9625 .unbound_nonreg_file = 1,
9627 .prep = io_tee_prep,
9630 [IORING_OP_SHUTDOWN] = {
9632 #if defined(CONFIG_NET)
9633 .prep = io_shutdown_prep,
9634 .issue = io_shutdown,
9636 .prep = io_eopnotsupp_prep,
9639 [IORING_OP_RENAMEAT] = {
9640 .prep = io_renameat_prep,
9641 .issue = io_renameat,
9642 .cleanup = io_renameat_cleanup,
9644 [IORING_OP_UNLINKAT] = {
9645 .prep = io_unlinkat_prep,
9646 .issue = io_unlinkat,
9647 .cleanup = io_unlinkat_cleanup,
9649 [IORING_OP_MKDIRAT] = {
9650 .prep = io_mkdirat_prep,
9651 .issue = io_mkdirat,
9652 .cleanup = io_mkdirat_cleanup,
9654 [IORING_OP_SYMLINKAT] = {
9655 .prep = io_symlinkat_prep,
9656 .issue = io_symlinkat,
9657 .cleanup = io_link_cleanup,
9659 [IORING_OP_LINKAT] = {
9660 .prep = io_linkat_prep,
9662 .cleanup = io_link_cleanup,
9664 [IORING_OP_MSG_RING] = {
9667 .prep = io_msg_ring_prep,
9668 .issue = io_msg_ring,
9670 [IORING_OP_FSETXATTR] = {
9672 .prep = io_fsetxattr_prep,
9673 .issue = io_fsetxattr,
9674 .cleanup = io_xattr_cleanup,
9676 [IORING_OP_SETXATTR] = {
9677 .prep = io_setxattr_prep,
9678 .issue = io_setxattr,
9679 .cleanup = io_xattr_cleanup,
9681 [IORING_OP_FGETXATTR] = {
9683 .prep = io_fgetxattr_prep,
9684 .issue = io_fgetxattr,
9685 .cleanup = io_xattr_cleanup,
9687 [IORING_OP_GETXATTR] = {
9688 .prep = io_getxattr_prep,
9689 .issue = io_getxattr,
9690 .cleanup = io_xattr_cleanup,
9692 [IORING_OP_SOCKET] = {
9694 #if defined(CONFIG_NET)
9695 .prep = io_socket_prep,
9698 .prep = io_eopnotsupp_prep,
9701 [IORING_OP_URING_CMD] = {
9704 .async_size = uring_cmd_pdu_size(1),
9705 .prep = io_uring_cmd_prep,
9706 .issue = io_uring_cmd,
9707 .prep_async = io_uring_cmd_prep_async,
9711 static int __init io_uring_init(void)
9715 #define __BUILD_BUG_VERIFY_ELEMENT(stype, eoffset, etype, ename) do { \
9716 BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
9717 BUILD_BUG_ON(sizeof(etype) != sizeof_field(stype, ename)); \
9720 #define BUILD_BUG_SQE_ELEM(eoffset, etype, ename) \
9721 __BUILD_BUG_VERIFY_ELEMENT(struct io_uring_sqe, eoffset, etype, ename)
9722 BUILD_BUG_ON(sizeof(struct io_uring_sqe) != 64);
9723 BUILD_BUG_SQE_ELEM(0, __u8, opcode);
9724 BUILD_BUG_SQE_ELEM(1, __u8, flags);
9725 BUILD_BUG_SQE_ELEM(2, __u16, ioprio);
9726 BUILD_BUG_SQE_ELEM(4, __s32, fd);
9727 BUILD_BUG_SQE_ELEM(8, __u64, off);
9728 BUILD_BUG_SQE_ELEM(8, __u64, addr2);
9729 BUILD_BUG_SQE_ELEM(16, __u64, addr);
9730 BUILD_BUG_SQE_ELEM(16, __u64, splice_off_in);
9731 BUILD_BUG_SQE_ELEM(24, __u32, len);
9732 BUILD_BUG_SQE_ELEM(28, __kernel_rwf_t, rw_flags);
9733 BUILD_BUG_SQE_ELEM(28, /* compat */ int, rw_flags);
9734 BUILD_BUG_SQE_ELEM(28, /* compat */ __u32, rw_flags);
9735 BUILD_BUG_SQE_ELEM(28, __u32, fsync_flags);
9736 BUILD_BUG_SQE_ELEM(28, /* compat */ __u16, poll_events);
9737 BUILD_BUG_SQE_ELEM(28, __u32, poll32_events);
9738 BUILD_BUG_SQE_ELEM(28, __u32, sync_range_flags);
9739 BUILD_BUG_SQE_ELEM(28, __u32, msg_flags);
9740 BUILD_BUG_SQE_ELEM(28, __u32, timeout_flags);
9741 BUILD_BUG_SQE_ELEM(28, __u32, accept_flags);
9742 BUILD_BUG_SQE_ELEM(28, __u32, cancel_flags);
9743 BUILD_BUG_SQE_ELEM(28, __u32, open_flags);
9744 BUILD_BUG_SQE_ELEM(28, __u32, statx_flags);
9745 BUILD_BUG_SQE_ELEM(28, __u32, fadvise_advice);
9746 BUILD_BUG_SQE_ELEM(28, __u32, splice_flags);
9747 BUILD_BUG_SQE_ELEM(32, __u64, user_data);
9748 BUILD_BUG_SQE_ELEM(40, __u16, buf_index);
9749 BUILD_BUG_SQE_ELEM(40, __u16, buf_group);
9750 BUILD_BUG_SQE_ELEM(42, __u16, personality);
9751 BUILD_BUG_SQE_ELEM(44, __s32, splice_fd_in);
9752 BUILD_BUG_SQE_ELEM(44, __u32, file_index);
9753 BUILD_BUG_SQE_ELEM(48, __u64, addr3);
9755 BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
9756 sizeof(struct io_uring_rsrc_update));
9757 BUILD_BUG_ON(sizeof(struct io_uring_rsrc_update) >
9758 sizeof(struct io_uring_rsrc_update2));
9760 /* ->buf_index is u16 */
9761 BUILD_BUG_ON(IORING_MAX_REG_BUFFERS >= (1u << 16));
9762 BUILD_BUG_ON(BGID_ARRAY * sizeof(struct io_buffer_list) > PAGE_SIZE);
9763 BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 0);
9764 BUILD_BUG_ON(offsetof(struct io_uring_buf, resv) !=
9765 offsetof(struct io_uring_buf_ring, tail));
9767 /* should fit into one byte */
9768 BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
9769 BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
9770 BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
9772 BUILD_BUG_ON(ARRAY_SIZE(io_op_defs) != IORING_OP_LAST);
9773 BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof(int));
9775 BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));
9777 for (i = 0; i < ARRAY_SIZE(io_op_defs); i++) {
9778 BUG_ON(!io_op_defs[i].prep);
9779 if (io_op_defs[i].prep != io_eopnotsupp_prep)
9780 BUG_ON(!io_op_defs[i].issue);
9783 req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
9787 __initcall(io_uring_init);