#include <linux/mman.h>
#include <linux/percpu.h>
#include <linux/slab.h>
-#include <linux/kthread.h>
#include <linux/blkdev.h>
#include <linux/bvec.h>
#include <linux/net.h>
bool registered;
};
+enum {
+ IO_SQ_THREAD_SHOULD_STOP = 0,
+ IO_SQ_THREAD_SHOULD_PARK,
+};
+
struct io_sq_data {
refcount_t refs;
struct mutex lock;
struct wait_queue_head wait;
unsigned sq_thread_idle;
+ int sq_cpu;
+ pid_t task_pid;
+
+ unsigned long state;
+ struct completion startup;
+ struct completion completion;
+ struct completion exited;
};
#define IO_IOPOLL_BATCH 8
struct {
unsigned int flags;
unsigned int compat: 1;
- unsigned int limit_mem: 1;
unsigned int cq_overflow_flushed: 1;
unsigned int drain_next: 1;
unsigned int eventfd_async: 1;
unsigned int restricted: 1;
- unsigned int sqo_dead: 1;
+ unsigned int sqo_exec: 1;
/*
* Ring buffer of indices into array of io_uring_sqe, which is
unsigned cached_cq_overflow;
unsigned long sq_check_overflow;
+ /* hashed buffered write serialization */
+ struct io_wq_hash *hash_map;
+
struct list_head defer_list;
struct list_head timeout_list;
struct list_head cq_overflow_list;
struct io_rings *rings;
- /* IO offload */
- struct io_wq *io_wq;
-
- /*
- * For SQPOLL usage - we hold a reference to the parent task, so we
- * have access to the ->files
- */
- struct task_struct *sqo_task;
-
/* Only used for accounting purposes */
struct mm_struct *mm_account;
-#ifdef CONFIG_BLK_CGROUP
- struct cgroup_subsys_state *sqo_blkcg_css;
-#endif
-
struct io_sq_data *sq_data; /* if using sq thread polling */
struct wait_queue_head sqo_sq_wait;
struct user_struct *user;
- const struct cred *creds;
-
-#ifdef CONFIG_AUDIT
- kuid_t loginuid;
- unsigned int sessionid;
-#endif
-
struct completion ref_comp;
struct completion sq_thread_comp;
struct io_restriction restrictions;
+ /* exit task_work */
+ struct callback_head *exit_task_work;
+
+ struct wait_queue_head hash_wait;
+
/* Keep this last, we don't need it for the fast path */
struct work_struct exit_work;
};
REQ_F_POLLED_BIT,
REQ_F_BUFFER_SELECTED_BIT,
REQ_F_NO_FILE_TABLE_BIT,
- REQ_F_WORK_INITIALIZED_BIT,
REQ_F_LTIMEOUT_ACTIVE_BIT,
REQ_F_COMPLETE_INLINE_BIT,
REQ_F_BUFFER_SELECTED = BIT(REQ_F_BUFFER_SELECTED_BIT),
/* doesn't need file table for this request */
REQ_F_NO_FILE_TABLE = BIT(REQ_F_NO_FILE_TABLE_BIT),
- /* io_wq_work is initialized */
- REQ_F_WORK_INITIALIZED = BIT(REQ_F_WORK_INITIALIZED_BIT),
/* linked timeout is active, i.e. prepared by link's head */
REQ_F_LTIMEOUT_ACTIVE = BIT(REQ_F_LTIMEOUT_ACTIVE_BIT),
/* completion is deferred through io_comp_state */
unsigned plug : 1;
/* size of async data needed, if any */
unsigned short async_size;
- unsigned work_flags;
};
static const struct io_op_def io_op_defs[] = {
.needs_async_data = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG,
},
[IORING_OP_WRITEV] = {
.needs_file = 1,
.needs_async_data = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG |
- IO_WQ_WORK_FSIZE,
},
[IORING_OP_FSYNC] = {
.needs_file = 1,
- .work_flags = IO_WQ_WORK_BLKCG,
},
[IORING_OP_READ_FIXED] = {
.needs_file = 1,
.pollin = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
- .work_flags = IO_WQ_WORK_BLKCG | IO_WQ_WORK_MM,
},
[IORING_OP_WRITE_FIXED] = {
.needs_file = 1,
.pollout = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
- .work_flags = IO_WQ_WORK_BLKCG | IO_WQ_WORK_FSIZE |
- IO_WQ_WORK_MM,
},
[IORING_OP_POLL_ADD] = {
.needs_file = 1,
[IORING_OP_POLL_REMOVE] = {},
[IORING_OP_SYNC_FILE_RANGE] = {
.needs_file = 1,
- .work_flags = IO_WQ_WORK_BLKCG,
},
[IORING_OP_SENDMSG] = {
.needs_file = 1,
.pollout = 1,
.needs_async_data = 1,
.async_size = sizeof(struct io_async_msghdr),
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG,
},
[IORING_OP_RECVMSG] = {
.needs_file = 1,
.buffer_select = 1,
.needs_async_data = 1,
.async_size = sizeof(struct io_async_msghdr),
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG,
},
[IORING_OP_TIMEOUT] = {
.needs_async_data = 1,
.async_size = sizeof(struct io_timeout_data),
- .work_flags = IO_WQ_WORK_MM,
},
[IORING_OP_TIMEOUT_REMOVE] = {
/* used by timeout updates' prep() */
- .work_flags = IO_WQ_WORK_MM,
},
[IORING_OP_ACCEPT] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollin = 1,
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_FILES,
},
[IORING_OP_ASYNC_CANCEL] = {},
[IORING_OP_LINK_TIMEOUT] = {
.needs_async_data = 1,
.async_size = sizeof(struct io_timeout_data),
- .work_flags = IO_WQ_WORK_MM,
},
[IORING_OP_CONNECT] = {
.needs_file = 1,
.pollout = 1,
.needs_async_data = 1,
.async_size = sizeof(struct io_async_connect),
- .work_flags = IO_WQ_WORK_MM,
},
[IORING_OP_FALLOCATE] = {
.needs_file = 1,
- .work_flags = IO_WQ_WORK_BLKCG | IO_WQ_WORK_FSIZE,
- },
- [IORING_OP_OPENAT] = {
- .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_BLKCG |
- IO_WQ_WORK_FS | IO_WQ_WORK_MM,
- },
- [IORING_OP_CLOSE] = {
- .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_BLKCG,
- },
- [IORING_OP_FILES_UPDATE] = {
- .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_MM,
- },
- [IORING_OP_STATX] = {
- .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_MM |
- IO_WQ_WORK_FS | IO_WQ_WORK_BLKCG,
},
+ [IORING_OP_OPENAT] = {},
+ [IORING_OP_CLOSE] = {},
+ [IORING_OP_FILES_UPDATE] = {},
+ [IORING_OP_STATX] = {},
[IORING_OP_READ] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.buffer_select = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG,
},
[IORING_OP_WRITE] = {
.needs_file = 1,
.pollout = 1,
.plug = 1,
.async_size = sizeof(struct io_async_rw),
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG |
- IO_WQ_WORK_FSIZE,
},
[IORING_OP_FADVISE] = {
.needs_file = 1,
- .work_flags = IO_WQ_WORK_BLKCG,
- },
- [IORING_OP_MADVISE] = {
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG,
},
+ [IORING_OP_MADVISE] = {},
[IORING_OP_SEND] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollout = 1,
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG,
},
[IORING_OP_RECV] = {
.needs_file = 1,
.unbound_nonreg_file = 1,
.pollin = 1,
.buffer_select = 1,
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG,
},
[IORING_OP_OPENAT2] = {
- .work_flags = IO_WQ_WORK_FILES | IO_WQ_WORK_FS |
- IO_WQ_WORK_BLKCG | IO_WQ_WORK_MM,
},
[IORING_OP_EPOLL_CTL] = {
.unbound_nonreg_file = 1,
- .work_flags = IO_WQ_WORK_FILES,
},
[IORING_OP_SPLICE] = {
.needs_file = 1,
.hash_reg_file = 1,
.unbound_nonreg_file = 1,
- .work_flags = IO_WQ_WORK_BLKCG,
},
[IORING_OP_PROVIDE_BUFFERS] = {},
[IORING_OP_REMOVE_BUFFERS] = {},
[IORING_OP_SHUTDOWN] = {
.needs_file = 1,
},
- [IORING_OP_RENAMEAT] = {
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_FILES |
- IO_WQ_WORK_FS | IO_WQ_WORK_BLKCG,
- },
- [IORING_OP_UNLINKAT] = {
- .work_flags = IO_WQ_WORK_MM | IO_WQ_WORK_FILES |
- IO_WQ_WORK_FS | IO_WQ_WORK_BLKCG,
- },
+ [IORING_OP_RENAMEAT] = {},
+ [IORING_OP_UNLINKAT] = {},
};
static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
struct task_struct *task,
struct files_struct *files);
+static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx);
static void destroy_fixed_rsrc_ref_node(struct fixed_rsrc_ref_node *ref_node);
static struct fixed_rsrc_ref_node *alloc_fixed_rsrc_ref_node(
struct io_ring_ctx *ctx);
}
}
-static bool io_refs_resurrect(struct percpu_ref *ref, struct completion *compl)
-{
- if (!percpu_ref_tryget(ref)) {
- /* already at zero, wait for ->release() */
- if (!try_wait_for_completion(compl))
- synchronize_rcu();
- return false;
- }
-
- percpu_ref_resurrect(ref);
- reinit_completion(compl);
- percpu_ref_put(ref);
- return true;
-}
-
static bool io_match_task(struct io_kiocb *head,
struct task_struct *task,
struct files_struct *files)
return true;
io_for_each_link(req, head) {
- if (!(req->flags & REQ_F_WORK_INITIALIZED))
- continue;
if (req->file && req->file->f_op == &io_uring_fops)
return true;
- if ((req->work.flags & IO_WQ_WORK_FILES) &&
- req->work.identity->files == files)
+ if (req->task->files == files)
return true;
}
return false;
}
-static void io_sq_thread_drop_mm_files(void)
-{
- struct files_struct *files = current->files;
- struct mm_struct *mm = current->mm;
-
- if (mm) {
- kthread_unuse_mm(mm);
- mmput(mm);
- current->mm = NULL;
- }
- if (files) {
- struct nsproxy *nsproxy = current->nsproxy;
-
- task_lock(current);
- current->files = NULL;
- current->nsproxy = NULL;
- task_unlock(current);
- put_files_struct(files);
- put_nsproxy(nsproxy);
- }
-}
-
-static int __io_sq_thread_acquire_files(struct io_ring_ctx *ctx)
-{
- if (!current->files) {
- struct files_struct *files;
- struct nsproxy *nsproxy;
-
- task_lock(ctx->sqo_task);
- files = ctx->sqo_task->files;
- if (!files) {
- task_unlock(ctx->sqo_task);
- return -EOWNERDEAD;
- }
- atomic_inc(&files->count);
- get_nsproxy(ctx->sqo_task->nsproxy);
- nsproxy = ctx->sqo_task->nsproxy;
- task_unlock(ctx->sqo_task);
-
- task_lock(current);
- current->files = files;
- current->nsproxy = nsproxy;
- task_unlock(current);
- }
- return 0;
-}
-
-static int __io_sq_thread_acquire_mm(struct io_ring_ctx *ctx)
-{
- struct mm_struct *mm;
-
- if (current->mm)
- return 0;
-
- task_lock(ctx->sqo_task);
- mm = ctx->sqo_task->mm;
- if (unlikely(!mm || !mmget_not_zero(mm)))
- mm = NULL;
- task_unlock(ctx->sqo_task);
-
- if (mm) {
- kthread_use_mm(mm);
- return 0;
- }
-
- return -EFAULT;
-}
-
-static int __io_sq_thread_acquire_mm_files(struct io_ring_ctx *ctx,
- struct io_kiocb *req)
-{
- const struct io_op_def *def = &io_op_defs[req->opcode];
- int ret;
-
- if (def->work_flags & IO_WQ_WORK_MM) {
- ret = __io_sq_thread_acquire_mm(ctx);
- if (unlikely(ret))
- return ret;
- }
-
- if (def->needs_file || (def->work_flags & IO_WQ_WORK_FILES)) {
- ret = __io_sq_thread_acquire_files(ctx);
- if (unlikely(ret))
- return ret;
- }
-
- return 0;
-}
-
-static inline int io_sq_thread_acquire_mm_files(struct io_ring_ctx *ctx,
- struct io_kiocb *req)
-{
- if (!(ctx->flags & IORING_SETUP_SQPOLL))
- return 0;
- return __io_sq_thread_acquire_mm_files(ctx, req);
-}
-
-static void io_sq_thread_associate_blkcg(struct io_ring_ctx *ctx,
- struct cgroup_subsys_state **cur_css)
-
-{
-#ifdef CONFIG_BLK_CGROUP
- /* puts the old one when swapping */
- if (*cur_css != ctx->sqo_blkcg_css) {
- kthread_associate_blkcg(ctx->sqo_blkcg_css);
- *cur_css = ctx->sqo_blkcg_css;
- }
-#endif
-}
-
-static void io_sq_thread_unassociate_blkcg(void)
-{
-#ifdef CONFIG_BLK_CGROUP
- kthread_associate_blkcg(NULL);
-#endif
-}
-
static inline void req_set_fail_links(struct io_kiocb *req)
{
if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
req->flags |= REQ_F_FAIL_LINK;
}
-/*
- * None of these are dereferenced, they are simply used to check if any of
- * them have changed. If we're under current and check they are still the
- * same, we're fine to grab references to them for actual out-of-line use.
- */
-static void io_init_identity(struct io_identity *id)
-{
- id->files = current->files;
- id->mm = current->mm;
-#ifdef CONFIG_BLK_CGROUP
- rcu_read_lock();
- id->blkcg_css = blkcg_css();
- rcu_read_unlock();
-#endif
- id->creds = current_cred();
- id->nsproxy = current->nsproxy;
- id->fs = current->fs;
- id->fsize = rlimit(RLIMIT_FSIZE);
-#ifdef CONFIG_AUDIT
- id->loginuid = current->loginuid;
- id->sessionid = current->sessionid;
-#endif
- refcount_set(&id->count, 1);
-}
-
-static inline void __io_req_init_async(struct io_kiocb *req)
-{
- memset(&req->work, 0, sizeof(req->work));
- req->flags |= REQ_F_WORK_INITIALIZED;
-}
-
-/*
- * Note: must call io_req_init_async() for the first time you
- * touch any members of io_wq_work.
- */
-static inline void io_req_init_async(struct io_kiocb *req)
-{
- struct io_uring_task *tctx = current->io_uring;
-
- if (req->flags & REQ_F_WORK_INITIALIZED)
- return;
-
- __io_req_init_async(req);
-
- /* Grab a ref if this isn't our static identity */
- req->work.identity = tctx->identity;
- if (tctx->identity != &tctx->__identity)
- refcount_inc(&req->work.identity->count);
-}
-
static void io_ring_ctx_ref_free(struct percpu_ref *ref)
{
struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
return false;
}
-static void io_put_identity(struct io_uring_task *tctx, struct io_kiocb *req)
-{
- if (req->work.identity == &tctx->__identity)
- return;
- if (refcount_dec_and_test(&req->work.identity->count))
- kfree(req->work.identity);
-}
-
static void io_req_clean_work(struct io_kiocb *req)
{
- if (!(req->flags & REQ_F_WORK_INITIALIZED))
- return;
-
- if (req->work.flags & IO_WQ_WORK_MM)
- mmdrop(req->work.identity->mm);
-#ifdef CONFIG_BLK_CGROUP
- if (req->work.flags & IO_WQ_WORK_BLKCG)
- css_put(req->work.identity->blkcg_css);
-#endif
- if (req->work.flags & IO_WQ_WORK_CREDS)
- put_cred(req->work.identity->creds);
- if (req->work.flags & IO_WQ_WORK_FS) {
- struct fs_struct *fs = req->work.identity->fs;
-
- spin_lock(&req->work.identity->fs->lock);
- if (--fs->users)
- fs = NULL;
- spin_unlock(&req->work.identity->fs->lock);
- if (fs)
- free_fs_struct(fs);
- }
- if (req->work.flags & IO_WQ_WORK_FILES) {
- put_files_struct(req->work.identity->files);
- put_nsproxy(req->work.identity->nsproxy);
- }
if (req->flags & REQ_F_INFLIGHT) {
struct io_ring_ctx *ctx = req->ctx;
struct io_uring_task *tctx = req->task->io_uring;
if (atomic_read(&tctx->in_idle))
wake_up(&tctx->wait);
}
-
- req->flags &= ~REQ_F_WORK_INITIALIZED;
- req->work.flags &= ~(IO_WQ_WORK_MM | IO_WQ_WORK_BLKCG | IO_WQ_WORK_FS |
- IO_WQ_WORK_CREDS | IO_WQ_WORK_FILES);
- io_put_identity(req->task->io_uring, req);
-}
-
-/*
- * Create a private copy of io_identity, since some fields don't match
- * the current context.
- */
-static bool io_identity_cow(struct io_kiocb *req)
-{
- struct io_uring_task *tctx = current->io_uring;
- const struct cred *creds = NULL;
- struct io_identity *id;
-
- if (req->work.flags & IO_WQ_WORK_CREDS)
- creds = req->work.identity->creds;
-
- id = kmemdup(req->work.identity, sizeof(*id), GFP_KERNEL);
- if (unlikely(!id)) {
- req->work.flags |= IO_WQ_WORK_CANCEL;
- return false;
- }
-
- /*
- * We can safely just re-init the creds we copied Either the field
- * matches the current one, or we haven't grabbed it yet. The only
- * exception is ->creds, through registered personalities, so handle
- * that one separately.
- */
- io_init_identity(id);
- if (creds)
- id->creds = creds;
-
- /* add one for this request */
- refcount_inc(&id->count);
-
- /* drop tctx and req identity references, if needed */
- if (tctx->identity != &tctx->__identity &&
- refcount_dec_and_test(&tctx->identity->count))
- kfree(tctx->identity);
- if (req->work.identity != &tctx->__identity &&
- refcount_dec_and_test(&req->work.identity->count))
- kfree(req->work.identity);
-
- req->work.identity = id;
- tctx->identity = id;
- return true;
}
static void io_req_track_inflight(struct io_kiocb *req)
struct io_ring_ctx *ctx = req->ctx;
if (!(req->flags & REQ_F_INFLIGHT)) {
- io_req_init_async(req);
req->flags |= REQ_F_INFLIGHT;
spin_lock_irq(&ctx->inflight_lock);
}
}
-static bool io_grab_identity(struct io_kiocb *req)
-{
- const struct io_op_def *def = &io_op_defs[req->opcode];
- struct io_identity *id = req->work.identity;
-
- if (def->work_flags & IO_WQ_WORK_FSIZE) {
- if (id->fsize != rlimit(RLIMIT_FSIZE))
- return false;
- req->work.flags |= IO_WQ_WORK_FSIZE;
- }
-#ifdef CONFIG_BLK_CGROUP
- if (!(req->work.flags & IO_WQ_WORK_BLKCG) &&
- (def->work_flags & IO_WQ_WORK_BLKCG)) {
- rcu_read_lock();
- if (id->blkcg_css != blkcg_css()) {
- rcu_read_unlock();
- return false;
- }
- /*
- * This should be rare, either the cgroup is dying or the task
- * is moving cgroups. Just punt to root for the handful of ios.
- */
- if (css_tryget_online(id->blkcg_css))
- req->work.flags |= IO_WQ_WORK_BLKCG;
- rcu_read_unlock();
- }
-#endif
- if (!(req->work.flags & IO_WQ_WORK_CREDS)) {
- if (id->creds != current_cred())
- return false;
- get_cred(id->creds);
- req->work.flags |= IO_WQ_WORK_CREDS;
- }
-#ifdef CONFIG_AUDIT
- if (!uid_eq(current->loginuid, id->loginuid) ||
- current->sessionid != id->sessionid)
- return false;
-#endif
- if (!(req->work.flags & IO_WQ_WORK_FS) &&
- (def->work_flags & IO_WQ_WORK_FS)) {
- if (current->fs != id->fs)
- return false;
- spin_lock(&id->fs->lock);
- if (!id->fs->in_exec) {
- id->fs->users++;
- req->work.flags |= IO_WQ_WORK_FS;
- } else {
- req->work.flags |= IO_WQ_WORK_CANCEL;
- }
- spin_unlock(¤t->fs->lock);
- }
- if (!(req->work.flags & IO_WQ_WORK_FILES) &&
- (def->work_flags & IO_WQ_WORK_FILES) &&
- !(req->flags & REQ_F_NO_FILE_TABLE)) {
- if (id->files != current->files ||
- id->nsproxy != current->nsproxy)
- return false;
- atomic_inc(&id->files->count);
- get_nsproxy(id->nsproxy);
- req->work.flags |= IO_WQ_WORK_FILES;
- io_req_track_inflight(req);
- }
- if (!(req->work.flags & IO_WQ_WORK_MM) &&
- (def->work_flags & IO_WQ_WORK_MM)) {
- if (id->mm != current->mm)
- return false;
- mmgrab(id->mm);
- req->work.flags |= IO_WQ_WORK_MM;
- }
-
- return true;
-}
-
static void io_prep_async_work(struct io_kiocb *req)
{
const struct io_op_def *def = &io_op_defs[req->opcode];
struct io_ring_ctx *ctx = req->ctx;
- io_req_init_async(req);
-
if (req->flags & REQ_F_FORCE_ASYNC)
req->work.flags |= IO_WQ_WORK_CONCURRENT;
if (def->unbound_nonreg_file)
req->work.flags |= IO_WQ_WORK_UNBOUND;
}
-
- /* if we fail grabbing identity, we must COW, regrab, and retry */
- if (io_grab_identity(req))
- return;
-
- if (!io_identity_cow(req))
- return;
-
- /* can't fail at this point */
- if (!io_grab_identity(req))
- WARN_ON(1);
}
static void io_prep_async_link(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_kiocb *link = io_prep_linked_timeout(req);
+ struct io_uring_task *tctx = req->task->io_uring;
+
+ BUG_ON(!tctx);
+ BUG_ON(!tctx->io_wq);
trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req,
&req->work, req->flags);
- io_wq_enqueue(ctx->io_wq, &req->work);
+ io_wq_enqueue(tctx->io_wq, &req->work);
return link;
}
trace_io_uring_fail_link(req, link);
io_cqring_fill_event(link, -ECANCELED);
- /*
- * It's ok to free under spinlock as they're not linked anymore,
- * but avoid REQ_F_WORK_INITIALIZED because it may deadlock on
- * work.fs->lock.
- */
- if (link->flags & REQ_F_WORK_INITIALIZED)
- io_put_req_deferred(link, 2);
- else
- io_double_put_req(link);
+ io_put_req_deferred(link, 2);
link = nxt;
}
io_commit_cqring(ctx);
return __io_req_find_next(req);
}
+static void ctx_flush_and_put(struct io_ring_ctx *ctx)
+{
+ if (!ctx)
+ return;
+ if (ctx->submit_state.comp.nr) {
+ mutex_lock(&ctx->uring_lock);
+ io_submit_flush_completions(&ctx->submit_state.comp, ctx);
+ mutex_unlock(&ctx->uring_lock);
+ }
+ percpu_ref_put(&ctx->refs);
+}
+
static bool __tctx_task_work(struct io_uring_task *tctx)
{
struct io_ring_ctx *ctx = NULL;
node = list.first;
while (node) {
struct io_wq_work_node *next = node->next;
- struct io_ring_ctx *this_ctx;
struct io_kiocb *req;
req = container_of(node, struct io_kiocb, io_task_work.node);
- this_ctx = req->ctx;
- req->task_work.func(&req->task_work);
- node = next;
-
- if (!ctx) {
- ctx = this_ctx;
- } else if (ctx != this_ctx) {
- mutex_lock(&ctx->uring_lock);
- io_submit_flush_completions(&ctx->submit_state.comp, ctx);
- mutex_unlock(&ctx->uring_lock);
- ctx = this_ctx;
+ if (req->ctx != ctx) {
+ ctx_flush_and_put(ctx);
+ ctx = req->ctx;
+ percpu_ref_get(&ctx->refs);
}
- }
- if (ctx && ctx->submit_state.comp.nr) {
- mutex_lock(&ctx->uring_lock);
- io_submit_flush_completions(&ctx->submit_state.comp, ctx);
- mutex_unlock(&ctx->uring_lock);
+ req->task_work.func(&req->task_work);
+ node = next;
}
+ ctx_flush_and_put(ctx);
return list.first != NULL;
}
{
struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work);
+ clear_bit(0, &tctx->task_state);
+
while (__tctx_task_work(tctx))
cond_resched();
-
- clear_bit(0, &tctx->task_state);
}
static int io_task_work_add(struct task_struct *tsk, struct io_kiocb *req,
static void io_req_task_work_add_fallback(struct io_kiocb *req,
task_work_func_t cb)
{
- struct task_struct *tsk = io_wq_get_task(req->ctx->io_wq);
+ struct io_ring_ctx *ctx = req->ctx;
+ struct callback_head *head;
init_task_work(&req->task_work, cb);
- task_work_add(tsk, &req->task_work, TWA_NONE);
- wake_up_process(tsk);
+ do {
+ head = READ_ONCE(ctx->exit_task_work);
+ req->task_work.next = head;
+ } while (cmpxchg(&ctx->exit_task_work, head, &req->task_work) != head);
}
static void __io_req_task_cancel(struct io_kiocb *req, int error)
/* ctx stays valid until unlock, even if we drop all ours ctx->refs */
mutex_lock(&ctx->uring_lock);
- if (!ctx->sqo_dead && !(current->flags & PF_EXITING) &&
- !io_sq_thread_acquire_mm_files(ctx, req))
+ if (!(current->flags & PF_EXITING) && !current->in_execve)
__io_queue_sqe(req);
else
__io_req_task_cancel(req, -EFAULT);
return false;
return !io_setup_async_rw(req, iovec, inline_vecs, &iter, false);
}
-#endif
-static bool io_rw_reissue(struct io_kiocb *req)
+static bool io_rw_should_reissue(struct io_kiocb *req)
{
-#ifdef CONFIG_BLOCK
umode_t mode = file_inode(req->file)->i_mode;
- int ret;
+ struct io_ring_ctx *ctx = req->ctx;
if (!S_ISBLK(mode) && !S_ISREG(mode))
return false;
- if ((req->flags & REQ_F_NOWAIT) || io_wq_current_is_worker())
+ if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
+ !(ctx->flags & IORING_SETUP_IOPOLL)))
return false;
/*
* If ref is dying, we might be running poll reap from the exit work.
* Don't attempt to reissue from that path, just let it fail with
* -EAGAIN.
*/
- if (percpu_ref_is_dying(&req->ctx->refs))
+ if (percpu_ref_is_dying(&ctx->refs))
return false;
+ return true;
+}
+#endif
- lockdep_assert_held(&req->ctx->uring_lock);
+static bool io_rw_reissue(struct io_kiocb *req)
+{
+#ifdef CONFIG_BLOCK
+ if (!io_rw_should_reissue(req))
+ return false;
- ret = io_sq_thread_acquire_mm_files(req->ctx, req);
+ lockdep_assert_held(&req->ctx->uring_lock);
- if (!ret && io_resubmit_prep(req)) {
+ if (io_resubmit_prep(req)) {
refcount_inc(&req->refs);
io_queue_async_work(req);
return true;
{
struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb);
+#ifdef CONFIG_BLOCK
+ /* Rewind iter, if we have one. iopoll path resubmits as usual */
+ if (res == -EAGAIN && io_rw_should_reissue(req)) {
+ struct io_async_rw *rw = req->async_data;
+
+ if (rw)
+ iov_iter_revert(&rw->iter,
+ req->result - iov_iter_count(&rw->iter));
+ else if (!io_resubmit_prep(req))
+ res = -EIO;
+ }
+#endif
+
if (kiocb->ki_flags & IOCB_WRITE)
kiocb_end_write(req);
ret = io_iter_do_read(req, iter);
if (ret == -EIOCBQUEUED) {
+ if (req->async_data)
+ iov_iter_revert(iter, io_size - iov_iter_count(iter));
goto out_free;
} else if (ret == -EAGAIN) {
/* IOPOLL retry should happen for io-wq threads */
/* no retry on NONBLOCK nor RWF_NOWAIT */
if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
goto done;
+ if (ret2 == -EIOCBQUEUED && req->async_data)
+ iov_iter_revert(iter, io_size - iov_iter_count(iter));
if (!force_nonblock || ret2 != -EAGAIN) {
/* IOPOLL retry should happen for io-wq threads */
if ((req->ctx->flags & IORING_SETUP_IOPOLL) && ret2 == -EAGAIN)
* Splice operation will be punted aync, and here need to
* modify io_wq_work.flags, so initialize io_wq_work firstly.
*/
- io_req_init_async(req);
req->work.flags |= IO_WQ_WORK_UNBOUND;
}
pt->error = -EINVAL;
return;
}
+ /* double add on the same waitqueue head, ignore */
+ if (poll->head == head)
+ return;
poll = kmalloc(sizeof(*poll), GFP_ATOMIC);
if (!poll) {
pt->error = -ENOMEM;
return req->user_data == (unsigned long) data;
}
-static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
+static int io_async_cancel_one(struct io_uring_task *tctx, void *sqe_addr)
{
enum io_wq_cancel cancel_ret;
int ret = 0;
- cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr, false);
+ if (!tctx->io_wq)
+ return -ENOENT;
+
+ cancel_ret = io_wq_cancel_cb(tctx->io_wq, io_cancel_cb, sqe_addr, false);
switch (cancel_ret) {
case IO_WQ_CANCEL_OK:
ret = 0;
unsigned long flags;
int ret;
- ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
+ ret = io_async_cancel_one(req->task->io_uring,
+ (void *) (unsigned long) sqe_addr);
if (ret != -ENOENT) {
spin_lock_irqsave(&ctx->completion_lock, flags);
goto done;
static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
{
struct io_ring_ctx *ctx = req->ctx;
+ const struct cred *creds = NULL;
int ret;
+ if (req->work.personality) {
+ const struct cred *new_creds;
+
+ if (!(issue_flags & IO_URING_F_NONBLOCK))
+ mutex_lock(&ctx->uring_lock);
+ new_creds = idr_find(&ctx->personality_idr, req->work.personality);
+ if (!(issue_flags & IO_URING_F_NONBLOCK))
+ mutex_unlock(&ctx->uring_lock);
+ if (!new_creds)
+ return -EINVAL;
+ creds = override_creds(new_creds);
+ }
+
switch (req->opcode) {
case IORING_OP_NOP:
ret = io_nop(req, issue_flags);
break;
}
+ if (creds)
+ revert_creds(creds);
+
if (ret)
return ret;
static void __io_queue_sqe(struct io_kiocb *req)
{
struct io_kiocb *linked_timeout = io_prep_linked_timeout(req);
- const struct cred *old_creds = NULL;
int ret;
- if ((req->flags & REQ_F_WORK_INITIALIZED) &&
- (req->work.flags & IO_WQ_WORK_CREDS) &&
- req->work.identity->creds != current_cred())
- old_creds = override_creds(req->work.identity->creds);
-
ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
- if (old_creds)
- revert_creds(old_creds);
-
/*
* We async punt it if the file wasn't marked NOWAIT, or if the file
* doesn't support non-blocking read/write attempts
{
struct io_submit_state *state;
unsigned int sqe_flags;
- int id, ret = 0;
+ int ret = 0;
req->opcode = READ_ONCE(sqe->opcode);
/* same numerical values with corresponding REQ_F_*, safe to copy */
if (unlikely(req->opcode >= IORING_OP_LAST))
return -EINVAL;
- if (unlikely(io_sq_thread_acquire_mm_files(ctx, req)))
- return -EFAULT;
-
if (unlikely(!io_check_restriction(ctx, req, sqe_flags)))
return -EACCES;
!io_op_defs[req->opcode].buffer_select)
return -EOPNOTSUPP;
- id = READ_ONCE(sqe->personality);
- if (id) {
- struct io_identity *iod;
-
- iod = idr_find(&ctx->personality_idr, id);
- if (unlikely(!iod))
- return -EINVAL;
- refcount_inc(&iod->count);
-
- __io_req_init_async(req);
- get_cred(iod->creds);
- req->work.identity = iod;
- req->work.flags |= IO_WQ_WORK_CREDS;
- }
-
+ req->work.list.next = NULL;
+ req->work.flags = 0;
+ req->work.personality = READ_ONCE(sqe->personality);
state = &ctx->submit_state;
/*
if (!list_empty(&ctx->iopoll_list))
io_do_iopoll(ctx, &nr_events, 0);
- if (to_submit && !ctx->sqo_dead &&
- likely(!percpu_ref_is_dying(&ctx->refs)))
+ if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)))
ret = io_submit_sqes(ctx, to_submit);
mutex_unlock(&ctx->uring_lock);
}
io_sqd_update_thread_idle(sqd);
}
+static bool io_sq_thread_should_stop(struct io_sq_data *sqd)
+{
+ return test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
+}
+
+static bool io_sq_thread_should_park(struct io_sq_data *sqd)
+{
+ return test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
+}
+
+static void io_sq_thread_parkme(struct io_sq_data *sqd)
+{
+ for (;;) {
+ /*
+ * TASK_PARKED is a special state; we must serialize against
+ * possible pending wakeups to avoid store-store collisions on
+ * task->state.
+ *
+ * Such a collision might possibly result in the task state
+ * changin from TASK_PARKED and us failing the
+ * wait_task_inactive() in kthread_park().
+ */
+ set_special_state(TASK_PARKED);
+ if (!test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state))
+ break;
+
+ /*
+ * Thread is going to call schedule(), do not preempt it,
+ * or the caller of kthread_park() may spend more time in
+ * wait_task_inactive().
+ */
+ preempt_disable();
+ complete(&sqd->completion);
+ schedule_preempt_disabled();
+ preempt_enable();
+ }
+ __set_current_state(TASK_RUNNING);
+}
+
static int io_sq_thread(void *data)
{
- struct cgroup_subsys_state *cur_css = NULL;
- struct files_struct *old_files = current->files;
- struct nsproxy *old_nsproxy = current->nsproxy;
- const struct cred *old_cred = NULL;
struct io_sq_data *sqd = data;
struct io_ring_ctx *ctx;
unsigned long timeout = 0;
+ char buf[TASK_COMM_LEN];
DEFINE_WAIT(wait);
- task_lock(current);
- current->files = NULL;
- current->nsproxy = NULL;
- task_unlock(current);
+ sprintf(buf, "iou-sqp-%d", sqd->task_pid);
+ set_task_comm(current, buf);
+ sqd->thread = current;
+ current->pf_io_worker = NULL;
+
+ if (sqd->sq_cpu != -1)
+ set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
+ else
+ set_cpus_allowed_ptr(current, cpu_online_mask);
+ current->flags |= PF_NO_SETAFFINITY;
+
+ complete(&sqd->completion);
- while (!kthread_should_stop()) {
+ wait_for_completion(&sqd->startup);
+
+ while (!io_sq_thread_should_stop(sqd)) {
int ret;
bool cap_entries, sqt_spin, needs_sched;
/*
* Any changes to the sqd lists are synchronized through the
- * kthread parking. This synchronizes the thread vs users,
+ * thread parking. This synchronizes the thread vs users,
* the users are synchronized on the sqd->ctx_lock.
*/
- if (kthread_should_park()) {
- kthread_parkme();
- /*
- * When sq thread is unparked, in case the previous park operation
- * comes from io_put_sq_data(), which means that sq thread is going
- * to be stopped, so here needs to have a check.
- */
- if (kthread_should_stop())
- break;
+ if (io_sq_thread_should_park(sqd)) {
+ io_sq_thread_parkme(sqd);
+ continue;
}
-
if (unlikely(!list_empty(&sqd->ctx_new_list))) {
io_sqd_init_new(sqd);
timeout = jiffies + sqd->sq_thread_idle;
}
-
+ if (fatal_signal_pending(current))
+ break;
sqt_spin = false;
cap_entries = !list_is_singular(&sqd->ctx_list);
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
- if (current->cred != ctx->creds) {
- if (old_cred)
- revert_creds(old_cred);
- old_cred = override_creds(ctx->creds);
- }
- io_sq_thread_associate_blkcg(ctx, &cur_css);
-#ifdef CONFIG_AUDIT
- current->loginuid = ctx->loginuid;
- current->sessionid = ctx->sessionid;
-#endif
-
ret = __io_sq_thread(ctx, cap_entries);
if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
sqt_spin = true;
-
- io_sq_thread_drop_mm_files();
}
if (sqt_spin || !time_after(jiffies, timeout)) {
io_run_task_work();
- io_sq_thread_drop_mm_files();
cond_resched();
if (sqt_spin)
timeout = jiffies + sqd->sq_thread_idle;
}
}
- if (needs_sched && !kthread_should_park()) {
+ if (needs_sched && !io_sq_thread_should_park(sqd)) {
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
io_ring_set_wakeup_flag(ctx);
timeout = jiffies + sqd->sq_thread_idle;
}
- io_run_task_work();
- io_sq_thread_drop_mm_files();
+ list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
+ io_uring_cancel_sqpoll(ctx);
- if (cur_css)
- io_sq_thread_unassociate_blkcg();
- if (old_cred)
- revert_creds(old_cred);
+ io_run_task_work();
- task_lock(current);
- current->files = old_files;
- current->nsproxy = old_nsproxy;
- task_unlock(current);
+ if (io_sq_thread_should_park(sqd))
+ io_sq_thread_parkme(sqd);
- kthread_parkme();
+ /*
+ * Clear thread under lock so that concurrent parks work correctly
+ */
+ complete(&sqd->completion);
+ mutex_lock(&sqd->lock);
+ sqd->thread = NULL;
+ list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
+ ctx->sqo_exec = 1;
+ io_ring_set_wakeup_flag(ctx);
+ }
- return 0;
+ complete(&sqd->exited);
+ mutex_unlock(&sqd->lock);
+ do_exit(0);
}
struct io_wait_queue {
flush_delayed_work(&ctx->rsrc_put_work);
ret = wait_for_completion_interruptible(&data->done);
- if (!ret || !io_refs_resurrect(&data->refs, &data->done))
+ if (!ret)
break;
+ percpu_ref_resurrect(&data->refs);
io_sqe_rsrc_set_node(ctx, data, backup_node);
backup_node = NULL;
+ reinit_completion(&data->done);
mutex_unlock(&ctx->uring_lock);
ret = io_run_task_work_sig();
mutex_lock(&ctx->uring_lock);
return 0;
}
+static void io_sq_thread_unpark(struct io_sq_data *sqd)
+ __releases(&sqd->lock)
+{
+ if (!sqd->thread)
+ return;
+ if (sqd->thread == current)
+ return;
+ clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
+ wake_up_state(sqd->thread, TASK_PARKED);
+ mutex_unlock(&sqd->lock);
+}
+
+static bool io_sq_thread_park(struct io_sq_data *sqd)
+ __acquires(&sqd->lock)
+{
+ if (sqd->thread == current)
+ return true;
+ mutex_lock(&sqd->lock);
+ if (!sqd->thread) {
+ mutex_unlock(&sqd->lock);
+ return false;
+ }
+ set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
+ wake_up_process(sqd->thread);
+ wait_for_completion(&sqd->completion);
+ return true;
+}
+
+static void io_sq_thread_stop(struct io_sq_data *sqd)
+{
+ if (test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state))
+ return;
+ mutex_lock(&sqd->lock);
+ if (sqd->thread) {
+ set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
+ WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state));
+ wake_up_process(sqd->thread);
+ mutex_unlock(&sqd->lock);
+ wait_for_completion(&sqd->exited);
+ WARN_ON_ONCE(sqd->thread);
+ } else {
+ mutex_unlock(&sqd->lock);
+ }
+}
+
static void io_put_sq_data(struct io_sq_data *sqd)
{
if (refcount_dec_and_test(&sqd->refs)) {
- /*
- * The park is a bit of a work-around, without it we get
- * warning spews on shutdown with SQPOLL set and affinity
- * set to a single CPU.
- */
+ io_sq_thread_stop(sqd);
+ kfree(sqd);
+ }
+}
+
+static void io_sq_thread_finish(struct io_ring_ctx *ctx)
+{
+ struct io_sq_data *sqd = ctx->sq_data;
+
+ if (sqd) {
+ complete(&sqd->startup);
if (sqd->thread) {
- kthread_park(sqd->thread);
- kthread_stop(sqd->thread);
+ wait_for_completion(&ctx->sq_thread_comp);
+ io_sq_thread_park(sqd);
}
- kfree(sqd);
+ mutex_lock(&sqd->ctx_lock);
+ list_del(&ctx->sqd_list);
+ io_sqd_update_thread_idle(sqd);
+ mutex_unlock(&sqd->ctx_lock);
+
+ if (sqd->thread)
+ io_sq_thread_unpark(sqd);
+
+ io_put_sq_data(sqd);
+ ctx->sq_data = NULL;
}
}
mutex_init(&sqd->ctx_lock);
mutex_init(&sqd->lock);
init_waitqueue_head(&sqd->wait);
+ init_completion(&sqd->startup);
+ init_completion(&sqd->completion);
+ init_completion(&sqd->exited);
return sqd;
}
-static void io_sq_thread_unpark(struct io_sq_data *sqd)
- __releases(&sqd->lock)
-{
- if (!sqd->thread)
- return;
- kthread_unpark(sqd->thread);
- mutex_unlock(&sqd->lock);
-}
-
-static void io_sq_thread_park(struct io_sq_data *sqd)
- __acquires(&sqd->lock)
-{
- if (!sqd->thread)
- return;
- mutex_lock(&sqd->lock);
- kthread_park(sqd->thread);
-}
-
-static void io_sq_thread_stop(struct io_ring_ctx *ctx)
-{
- struct io_sq_data *sqd = ctx->sq_data;
-
- if (sqd) {
- if (sqd->thread) {
- /*
- * We may arrive here from the error branch in
- * io_sq_offload_create() where the kthread is created
- * without being waked up, thus wake it up now to make
- * sure the wait will complete.
- */
- wake_up_process(sqd->thread);
- wait_for_completion(&ctx->sq_thread_comp);
-
- io_sq_thread_park(sqd);
- }
-
- mutex_lock(&sqd->ctx_lock);
- list_del(&ctx->sqd_list);
- io_sqd_update_thread_idle(sqd);
- mutex_unlock(&sqd->ctx_lock);
-
- if (sqd->thread)
- io_sq_thread_unpark(sqd);
-
- io_put_sq_data(sqd);
- ctx->sq_data = NULL;
- }
-}
-
-static void io_finish_async(struct io_ring_ctx *ctx)
-{
- io_sq_thread_stop(ctx);
-
- if (ctx->io_wq) {
- io_wq_destroy(ctx->io_wq);
- ctx->io_wq = NULL;
- }
-}
-
#if defined(CONFIG_UNIX)
/*
* Ensure the UNIX gc is aware of our file set, so we are certain that
skb->sk = sk;
nr_files = 0;
- fpl->user = get_uid(ctx->user);
+ fpl->user = get_uid(current_user());
for (i = 0; i < nr; i++) {
struct file *file = io_file_from_index(ctx, i + offset);
return req ? &req->work : NULL;
}
-static int io_init_wq_offload(struct io_ring_ctx *ctx,
- struct io_uring_params *p)
+static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx)
{
+ struct io_wq_hash *hash;
struct io_wq_data data;
- struct fd f;
- struct io_ring_ctx *ctx_attach;
unsigned int concurrency;
- int ret = 0;
- data.user = ctx->user;
- data.free_work = io_free_work;
- data.do_work = io_wq_submit_work;
-
- if (!(p->flags & IORING_SETUP_ATTACH_WQ)) {
- /* Do QD, or 4 * CPUS, whatever is smallest */
- concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
-
- ctx->io_wq = io_wq_create(concurrency, &data);
- if (IS_ERR(ctx->io_wq)) {
- ret = PTR_ERR(ctx->io_wq);
- ctx->io_wq = NULL;
- }
- return ret;
+ hash = ctx->hash_map;
+ if (!hash) {
+ hash = kzalloc(sizeof(*hash), GFP_KERNEL);
+ if (!hash)
+ return ERR_PTR(-ENOMEM);
+ refcount_set(&hash->refs, 1);
+ init_waitqueue_head(&hash->wait);
+ ctx->hash_map = hash;
}
- f = fdget(p->wq_fd);
- if (!f.file)
- return -EBADF;
-
- if (f.file->f_op != &io_uring_fops) {
- ret = -EINVAL;
- goto out_fput;
- }
+ data.hash = hash;
+ data.free_work = io_free_work;
+ data.do_work = io_wq_submit_work;
- ctx_attach = f.file->private_data;
- /* @io_wq is protected by holding the fd */
- if (!io_wq_get(ctx_attach->io_wq, &data)) {
- ret = -EINVAL;
- goto out_fput;
- }
+ /* Do QD, or 4 * CPUS, whatever is smallest */
+ concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
- ctx->io_wq = ctx_attach->io_wq;
-out_fput:
- fdput(f);
- return ret;
+ return io_wq_create(concurrency, &data);
}
-static int io_uring_alloc_task_context(struct task_struct *task)
+static int io_uring_alloc_task_context(struct task_struct *task,
+ struct io_ring_ctx *ctx)
{
struct io_uring_task *tctx;
int ret;
return ret;
}
+ tctx->io_wq = io_init_wq_offload(ctx);
+ if (IS_ERR(tctx->io_wq)) {
+ ret = PTR_ERR(tctx->io_wq);
+ percpu_counter_destroy(&tctx->inflight);
+ kfree(tctx);
+ return ret;
+ }
+
xa_init(&tctx->xa);
init_waitqueue_head(&tctx->wait);
tctx->last = NULL;
atomic_set(&tctx->in_idle, 0);
tctx->sqpoll = false;
- io_init_identity(&tctx->__identity);
- tctx->identity = &tctx->__identity;
task->io_uring = tctx;
spin_lock_init(&tctx->task_lock);
INIT_WQ_LIST(&tctx->task_list);
struct io_uring_task *tctx = tsk->io_uring;
WARN_ON_ONCE(!xa_empty(&tctx->xa));
- WARN_ON_ONCE(refcount_read(&tctx->identity->count) != 1);
- if (tctx->identity != &tctx->__identity)
- kfree(tctx->identity);
+ WARN_ON_ONCE(tctx->io_wq);
+
percpu_counter_destroy(&tctx->inflight);
kfree(tctx);
tsk->io_uring = NULL;
}
+static int io_sq_thread_fork(struct io_sq_data *sqd, struct io_ring_ctx *ctx)
+{
+ int ret;
+
+ clear_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
+ reinit_completion(&sqd->completion);
+ ctx->sqo_exec = 0;
+ sqd->task_pid = current->pid;
+ current->flags |= PF_IO_WORKER;
+ ret = io_wq_fork_thread(io_sq_thread, sqd);
+ current->flags &= ~PF_IO_WORKER;
+ if (ret < 0) {
+ sqd->thread = NULL;
+ return ret;
+ }
+ wait_for_completion(&sqd->completion);
+ return io_uring_alloc_task_context(sqd->thread, ctx);
+}
+
static int io_sq_offload_create(struct io_ring_ctx *ctx,
struct io_uring_params *p)
{
int ret;
+ /* Retain compatibility with failing for an invalid attach attempt */
+ if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
+ IORING_SETUP_ATTACH_WQ) {
+ struct fd f;
+
+ f = fdget(p->wq_fd);
+ if (!f.file)
+ return -ENXIO;
+ if (f.file->f_op != &io_uring_fops) {
+ fdput(f);
+ return -EINVAL;
+ }
+ fdput(f);
+ }
if (ctx->flags & IORING_SETUP_SQPOLL) {
struct io_sq_data *sqd;
ctx->sq_thread_idle = HZ;
if (sqd->thread)
- goto done;
+ return 0;
if (p->flags & IORING_SETUP_SQ_AFF) {
int cpu = p->sq_thread_cpu;
if (!cpu_online(cpu))
goto err;
- sqd->thread = kthread_create_on_cpu(io_sq_thread, sqd,
- cpu, "io_uring-sq");
+ sqd->sq_cpu = cpu;
} else {
- sqd->thread = kthread_create(io_sq_thread, sqd,
- "io_uring-sq");
+ sqd->sq_cpu = -1;
}
- if (IS_ERR(sqd->thread)) {
- ret = PTR_ERR(sqd->thread);
+
+ sqd->task_pid = current->pid;
+ current->flags |= PF_IO_WORKER;
+ ret = io_wq_fork_thread(io_sq_thread, sqd);
+ current->flags &= ~PF_IO_WORKER;
+ if (ret < 0) {
sqd->thread = NULL;
goto err;
}
- ret = io_uring_alloc_task_context(sqd->thread);
+ wait_for_completion(&sqd->completion);
+ ret = io_uring_alloc_task_context(sqd->thread, ctx);
if (ret)
goto err;
} else if (p->flags & IORING_SETUP_SQ_AFF) {
goto err;
}
-done:
- ret = io_init_wq_offload(ctx, p);
- if (ret)
- goto err;
-
return 0;
err:
- io_finish_async(ctx);
+ io_sq_thread_finish(ctx);
return ret;
}
{
struct io_sq_data *sqd = ctx->sq_data;
- if ((ctx->flags & IORING_SETUP_SQPOLL) && sqd->thread)
- wake_up_process(sqd->thread);
+ ctx->flags &= ~IORING_SETUP_R_DISABLED;
+ if (ctx->flags & IORING_SETUP_SQPOLL)
+ complete(&sqd->startup);
}
static inline void __io_unaccount_mem(struct user_struct *user,
static void io_unaccount_mem(struct io_ring_ctx *ctx, unsigned long nr_pages)
{
- if (ctx->limit_mem)
+ if (ctx->user)
__io_unaccount_mem(ctx->user, nr_pages);
if (ctx->mm_account)
{
int ret;
- if (ctx->limit_mem) {
+ if (ctx->user) {
ret = __io_account_mem(ctx->user, nr_pages);
if (ret)
return ret;
}
}
-static void io_req_caches_free(struct io_ring_ctx *ctx, struct task_struct *tsk)
+static void io_req_caches_free(struct io_ring_ctx *ctx)
{
struct io_submit_state *submit_state = &ctx->submit_state;
struct io_comp_state *cs = &ctx->submit_state.comp;
mutex_lock(&ctx->uring_lock);
mutex_unlock(&ctx->uring_lock);
- io_finish_async(ctx);
+ io_sq_thread_finish(ctx);
io_sqe_buffers_unregister(ctx);
- if (ctx->sqo_task) {
- put_task_struct(ctx->sqo_task);
- ctx->sqo_task = NULL;
+ if (ctx->mm_account) {
mmdrop(ctx->mm_account);
ctx->mm_account = NULL;
}
-#ifdef CONFIG_BLK_CGROUP
- if (ctx->sqo_blkcg_css)
- css_put(ctx->sqo_blkcg_css);
-#endif
-
mutex_lock(&ctx->uring_lock);
io_sqe_files_unregister(ctx);
mutex_unlock(&ctx->uring_lock);
percpu_ref_exit(&ctx->refs);
free_uid(ctx->user);
- put_cred(ctx->creds);
- io_req_caches_free(ctx, NULL);
+ io_req_caches_free(ctx);
+ if (ctx->hash_map)
+ io_wq_put_hash(ctx->hash_map);
kfree(ctx->cancel_hash);
kfree(ctx);
}
static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id)
{
- struct io_identity *iod;
+ const struct cred *creds;
- iod = idr_remove(&ctx->personality_idr, id);
- if (iod) {
- put_cred(iod->creds);
- if (refcount_dec_and_test(&iod->count))
- kfree(iod);
+ creds = idr_remove(&ctx->personality_idr, id);
+ if (creds) {
+ put_cred(creds);
return 0;
}
return 0;
}
+static bool io_run_ctx_fallback(struct io_ring_ctx *ctx)
+{
+ struct callback_head *work, *next;
+ bool executed = false;
+
+ do {
+ work = xchg(&ctx->exit_task_work, NULL);
+ if (!work)
+ break;
+
+ do {
+ next = work->next;
+ work->func(work);
+ work = next;
+ cond_resched();
+ } while (work);
+ executed = true;
+ } while (1);
+
+ return executed;
+}
+
static void io_ring_exit_work(struct work_struct *work)
{
struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
io_ring_ctx_free(ctx);
}
-static bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
-{
- struct io_kiocb *req = container_of(work, struct io_kiocb, work);
-
- return req->ctx == data;
-}
-
static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
{
mutex_lock(&ctx->uring_lock);
percpu_ref_kill(&ctx->refs);
-
- if (WARN_ON_ONCE((ctx->flags & IORING_SETUP_SQPOLL) && !ctx->sqo_dead))
- ctx->sqo_dead = 1;
-
/* if force is set, the ring is going away. always drop after that */
ctx->cq_overflow_flushed = 1;
if (ctx->rings)
io_kill_timeouts(ctx, NULL, NULL);
io_poll_remove_all(ctx, NULL, NULL);
- if (ctx->io_wq)
- io_wq_cancel_cb(ctx->io_wq, io_cancel_ctx_cb, ctx, true);
-
/* if we failed setting up the ctx, we might not have any rings */
io_iopoll_try_reap_events(ctx);
struct files_struct *files)
{
struct io_task_cancel cancel = { .task = task, .files = files, };
+ struct io_uring_task *tctx = current->io_uring;
while (1) {
enum io_wq_cancel cret;
bool ret = false;
- if (ctx->io_wq) {
- cret = io_wq_cancel_cb(ctx->io_wq, io_cancel_task_cb,
+ if (tctx && tctx->io_wq) {
+ cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
&cancel, true);
ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
}
ret |= io_poll_remove_all(ctx, task, files);
ret |= io_kill_timeouts(ctx, task, files);
ret |= io_run_task_work();
+ ret |= io_run_ctx_fallback(ctx);
io_cqring_overflow_flush(ctx, true, task, files);
if (!ret)
break;
}
}
-static void io_disable_sqo_submit(struct io_ring_ctx *ctx)
-{
- mutex_lock(&ctx->uring_lock);
- ctx->sqo_dead = 1;
- mutex_unlock(&ctx->uring_lock);
-
- /* make sure callers enter the ring to get error */
- if (ctx->rings)
- io_ring_set_wakeup_flag(ctx);
-}
-
/*
* We need to iteratively cancel requests, in case a request has dependent
* hard links. These persist even for failure of cancelations, hence keep
struct files_struct *files)
{
struct task_struct *task = current;
+ bool did_park = false;
if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) {
- io_disable_sqo_submit(ctx);
- task = ctx->sq_data->thread;
- atomic_inc(&task->io_uring->in_idle);
- io_sq_thread_park(ctx->sq_data);
+ /* never started, nothing to cancel */
+ if (ctx->flags & IORING_SETUP_R_DISABLED) {
+ io_sq_offload_start(ctx);
+ return;
+ }
+ did_park = io_sq_thread_park(ctx->sq_data);
+ if (did_park) {
+ task = ctx->sq_data->thread;
+ atomic_inc(&task->io_uring->in_idle);
+ }
}
io_cancel_defer_files(ctx, task, files);
if (!files)
io_uring_try_cancel_requests(ctx, task, NULL);
- if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) {
+ if (did_park) {
atomic_dec(&task->io_uring->in_idle);
- /*
- * If the files that are going away are the ones in the thread
- * identity, clear them out.
- */
- if (task->io_uring->identity->files == files)
- task->io_uring->identity->files = NULL;
io_sq_thread_unpark(ctx->sq_data);
}
}
int ret;
if (unlikely(!tctx)) {
- ret = io_uring_alloc_task_context(current);
+ ret = io_uring_alloc_task_context(current, ctx);
if (unlikely(ret))
return ret;
tctx = current->io_uring;
fput(file);
return ret;
}
-
- /* one and only SQPOLL file note, held by sqo_task */
- WARN_ON_ONCE((ctx->flags & IORING_SETUP_SQPOLL) &&
- current != ctx->sqo_task);
}
tctx->last = file;
}
fput(file);
}
-static void io_uring_remove_task_files(struct io_uring_task *tctx)
+static void io_uring_clean_tctx(struct io_uring_task *tctx)
{
struct file *file;
unsigned long index;
xa_for_each(&tctx->xa, index, file)
io_uring_del_task_file(file);
+ if (tctx->io_wq) {
+ io_wq_put_and_exit(tctx->io_wq);
+ tctx->io_wq = NULL;
+ }
}
void __io_uring_files_cancel(struct files_struct *files)
atomic_dec(&tctx->in_idle);
if (files)
- io_uring_remove_task_files(tctx);
+ io_uring_clean_tctx(tctx);
}
static s64 tctx_inflight(struct io_uring_task *tctx)
static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx)
{
+ struct io_sq_data *sqd = ctx->sq_data;
struct io_uring_task *tctx;
s64 inflight;
DEFINE_WAIT(wait);
- if (!ctx->sq_data)
+ if (!sqd)
+ return;
+ if (!io_sq_thread_park(sqd))
return;
tctx = ctx->sq_data->thread->io_uring;
- io_disable_sqo_submit(ctx);
+ /* can happen on fork/alloc failure, just ignore that state */
+ if (!tctx) {
+ io_sq_thread_unpark(sqd);
+ return;
+ }
atomic_inc(&tctx->in_idle);
do {
finish_wait(&tctx->wait, &wait);
} while (1);
atomic_dec(&tctx->in_idle);
+ io_sq_thread_unpark(sqd);
}
/*
/* make sure overflow events are dropped */
atomic_inc(&tctx->in_idle);
- /* trigger io_disable_sqo_submit() */
if (tctx->sqpoll) {
struct file *file;
unsigned long index;
atomic_dec(&tctx->in_idle);
- io_uring_remove_task_files(tctx);
-}
-
-static int io_uring_flush(struct file *file, void *data)
-{
- struct io_uring_task *tctx = current->io_uring;
- struct io_ring_ctx *ctx = file->private_data;
-
- if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
- io_uring_cancel_task_requests(ctx, NULL);
- io_req_caches_free(ctx, current);
- }
-
- if (!tctx)
- return 0;
-
- /* we should have cancelled and erased it before PF_EXITING */
- WARN_ON_ONCE((current->flags & PF_EXITING) &&
- xa_load(&tctx->xa, (unsigned long)file));
-
- /*
- * fput() is pending, will be 2 if the only other ref is our potential
- * task file note. If the task is exiting, drop regardless of count.
- */
- if (atomic_long_read(&file->f_count) != 2)
- return 0;
-
- if (ctx->flags & IORING_SETUP_SQPOLL) {
- /* there is only one file note, which is owned by sqo_task */
- WARN_ON_ONCE(ctx->sqo_task != current &&
- xa_load(&tctx->xa, (unsigned long)file));
- /* sqo_dead check is for when this happens after cancellation */
- WARN_ON_ONCE(ctx->sqo_task == current && !ctx->sqo_dead &&
- !xa_load(&tctx->xa, (unsigned long)file));
-
- io_disable_sqo_submit(ctx);
- }
-
- if (!(ctx->flags & IORING_SETUP_SQPOLL) || ctx->sqo_task == current)
- io_uring_del_task_file(file);
- return 0;
+ io_uring_clean_tctx(tctx);
+ /* all current's requests should be gone, we can kill tctx */
+ __io_uring_free(current);
}
static void *io_uring_validate_mmap_request(struct file *file,
do {
if (!io_sqring_full(ctx))
break;
-
prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
- if (unlikely(ctx->sqo_dead)) {
- ret = -EOWNERDEAD;
- goto out;
- }
-
if (!io_sqring_full(ctx))
break;
-
schedule();
} while (!signal_pending(current));
finish_wait(&ctx->sqo_sq_wait, &wait);
-out:
return ret;
}
if (ctx->flags & IORING_SETUP_SQPOLL) {
io_cqring_overflow_flush(ctx, false, NULL, NULL);
+ if (unlikely(ctx->sqo_exec)) {
+ ret = io_sq_thread_fork(ctx->sq_data, ctx);
+ if (ret)
+ goto out;
+ ctx->sqo_exec = 0;
+ }
ret = -EOWNERDEAD;
- if (unlikely(ctx->sqo_dead))
- goto out;
if (flags & IORING_ENTER_SQ_WAKEUP)
wake_up(&ctx->sq_data->wait);
if (flags & IORING_ENTER_SQ_WAIT) {
#ifdef CONFIG_PROC_FS
static int io_uring_show_cred(int id, void *p, void *data)
{
- struct io_identity *iod = p;
- const struct cred *cred = iod->creds;
+ const struct cred *cred = p;
struct seq_file *m = data;
struct user_namespace *uns = seq_user_ns(m);
struct group_info *gi;
*/
has_lock = mutex_trylock(&ctx->uring_lock);
- if (has_lock && (ctx->flags & IORING_SETUP_SQPOLL))
+ if (has_lock && (ctx->flags & IORING_SETUP_SQPOLL)) {
sq = ctx->sq_data;
+ if (!sq->thread)
+ sq = NULL;
+ }
seq_printf(m, "SqThread:\t%d\n", sq ? task_pid_nr(sq->thread) : -1);
seq_printf(m, "SqThreadCpu:\t%d\n", sq ? task_cpu(sq->thread) : -1);
static const struct file_operations io_uring_fops = {
.release = io_uring_release,
- .flush = io_uring_flush,
.mmap = io_uring_mmap,
#ifndef CONFIG_MMU
.get_unmapped_area = io_uring_nommu_get_unmapped_area,
static int io_uring_create(unsigned entries, struct io_uring_params *p,
struct io_uring_params __user *params)
{
- struct user_struct *user = NULL;
struct io_ring_ctx *ctx;
struct file *file;
int ret;
p->cq_entries = 2 * p->sq_entries;
}
- user = get_uid(current_user());
-
ctx = io_ring_ctx_alloc(p);
- if (!ctx) {
- free_uid(user);
+ if (!ctx)
return -ENOMEM;
- }
ctx->compat = in_compat_syscall();
- ctx->limit_mem = !capable(CAP_IPC_LOCK);
- ctx->user = user;
- ctx->creds = get_current_cred();
-#ifdef CONFIG_AUDIT
- ctx->loginuid = current->loginuid;
- ctx->sessionid = current->sessionid;
-#endif
- ctx->sqo_task = get_task_struct(current);
+ if (!capable(CAP_IPC_LOCK))
+ ctx->user = get_uid(current_user());
/*
* This is just grabbed for accounting purposes. When a process exits,
mmgrab(current->mm);
ctx->mm_account = current->mm;
-#ifdef CONFIG_BLK_CGROUP
- /*
- * The sq thread will belong to the original cgroup it was inited in.
- * If the cgroup goes offline (e.g. disabling the io controller), then
- * issued bios will be associated with the closest cgroup later in the
- * block layer.
- */
- rcu_read_lock();
- ctx->sqo_blkcg_css = blkcg_css();
- ret = css_tryget_online(ctx->sqo_blkcg_css);
- rcu_read_unlock();
- if (!ret) {
- /* don't init against a dying cgroup, have the user try again */
- ctx->sqo_blkcg_css = NULL;
- ret = -ENODEV;
- goto err;
- }
-#endif
ret = io_allocate_scq_urings(ctx, p);
if (ret)
goto err;
IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
- IORING_FEAT_EXT_ARG;
+ IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS;
if (copy_to_user(params, p, sizeof(*p))) {
ret = -EFAULT;
*/
ret = io_uring_install_fd(ctx, file);
if (ret < 0) {
- io_disable_sqo_submit(ctx);
/* fput will clean it up */
fput(file);
return ret;
trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
return ret;
err:
- io_disable_sqo_submit(ctx);
io_ring_ctx_wait_and_kill(ctx);
return ret;
}
static int io_register_personality(struct io_ring_ctx *ctx)
{
- struct io_identity *id;
+ const struct cred *creds;
int ret;
- id = kmalloc(sizeof(*id), GFP_KERNEL);
- if (unlikely(!id))
- return -ENOMEM;
-
- io_init_identity(id);
- id->creds = get_current_cred();
+ creds = get_current_cred();
- ret = idr_alloc_cyclic(&ctx->personality_idr, id, 1, USHRT_MAX, GFP_KERNEL);
- if (ret < 0) {
- put_cred(id->creds);
- kfree(id);
- }
+ ret = idr_alloc_cyclic(&ctx->personality_idr, (void *) creds, 1,
+ USHRT_MAX, GFP_KERNEL);
+ if (ret < 0)
+ put_cred(creds);
return ret;
}
if (ctx->restrictions.registered)
ctx->restricted = 1;
- ctx->flags &= ~IORING_SETUP_R_DISABLED;
-
io_sq_offload_start(ctx);
-
return 0;
}
mutex_lock(&ctx->uring_lock);
- if (ret && io_refs_resurrect(&ctx->refs, &ctx->ref_comp))
- return ret;
+ if (ret) {
+ percpu_ref_resurrect(&ctx->refs);
+ goto out_quiesce;
+ }
}
if (ctx->restricted) {
if (io_register_op_must_quiesce(opcode)) {
/* bring the ctx back to life */
percpu_ref_reinit(&ctx->refs);
+out_quiesce:
reinit_completion(&ctx->ref_comp);
}
return ret;