perf metric: Add expr__add_id function
[linux-2.6-microblaze.git] / fs / io_uring.c
index 155f3d8..d37d7ea 100644 (file)
@@ -541,6 +541,7 @@ enum {
        REQ_F_NO_FILE_TABLE_BIT,
        REQ_F_QUEUE_TIMEOUT_BIT,
        REQ_F_WORK_INITIALIZED_BIT,
+       REQ_F_TASK_PINNED_BIT,
 
        /* not a real bit, just to check we're not overflowing the space */
        __REQ_F_LAST_BIT,
@@ -598,6 +599,8 @@ enum {
        REQ_F_QUEUE_TIMEOUT     = BIT(REQ_F_QUEUE_TIMEOUT_BIT),
        /* io_wq_work is initialized */
        REQ_F_WORK_INITIALIZED  = BIT(REQ_F_WORK_INITIALIZED_BIT),
+       /* req->task is refcounted */
+       REQ_F_TASK_PINNED       = BIT(REQ_F_TASK_PINNED_BIT),
 };
 
 struct async_poll {
@@ -887,6 +890,7 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
                                 struct io_uring_files_update *ip,
                                 unsigned nr_args);
 static int io_grab_files(struct io_kiocb *req);
+static void io_complete_rw_common(struct kiocb *kiocb, long res);
 static void io_cleanup_req(struct io_kiocb *req);
 static int io_file_get(struct io_submit_state *state, struct io_kiocb *req,
                       int fd, struct file **out_file, bool fixed);
@@ -910,6 +914,21 @@ struct sock *io_uring_get_socket(struct file *file)
 }
 EXPORT_SYMBOL(io_uring_get_socket);
 
+static void io_get_req_task(struct io_kiocb *req)
+{
+       if (req->flags & REQ_F_TASK_PINNED)
+               return;
+       get_task_struct(req->task);
+       req->flags |= REQ_F_TASK_PINNED;
+}
+
+/* not idempotent -- it doesn't clear REQ_F_TASK_PINNED */
+static void __io_put_req_task(struct io_kiocb *req)
+{
+       if (req->flags & REQ_F_TASK_PINNED)
+               put_task_struct(req->task);
+}
+
 static void io_file_put_work(struct work_struct *work);
 
 /*
@@ -1045,8 +1064,6 @@ static inline void io_req_work_grab_env(struct io_kiocb *req,
                }
                spin_unlock(&current->fs->lock);
        }
-       if (!req->work.task_pid)
-               req->work.task_pid = task_pid_vnr(current);
 }
 
 static inline void io_req_work_drop_env(struct io_kiocb *req)
@@ -1087,6 +1104,7 @@ static inline void io_prep_async_work(struct io_kiocb *req,
                        req->work.flags |= IO_WQ_WORK_UNBOUND;
        }
 
+       io_req_init_async(req);
        io_req_work_grab_env(req, def);
 
        *link = io_prep_linked_timeout(req);
@@ -1398,9 +1416,7 @@ static void __io_req_aux_free(struct io_kiocb *req)
        kfree(req->io);
        if (req->file)
                io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE));
-       if (req->task)
-               put_task_struct(req->task);
-
+       __io_put_req_task(req);
        io_req_work_drop_env(req);
 }
 
@@ -1727,6 +1743,26 @@ static int io_put_kbuf(struct io_kiocb *req)
        return cflags;
 }
 
+static void io_iopoll_queue(struct list_head *again)
+{
+       struct io_kiocb *req;
+
+       do {
+               req = list_first_entry(again, struct io_kiocb, list);
+               list_del(&req->list);
+
+               /* shouldn't happen unless io_uring is dying, cancel reqs */
+               if (unlikely(!current->mm)) {
+                       io_complete_rw_common(&req->rw.kiocb, -EAGAIN);
+                       io_put_req(req);
+                       continue;
+               }
+
+               refcount_inc(&req->refs);
+               io_queue_async_work(req);
+       } while (!list_empty(again));
+}
+
 /*
  * Find and free completed poll iocbs
  */
@@ -1735,12 +1771,21 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
 {
        struct req_batch rb;
        struct io_kiocb *req;
+       LIST_HEAD(again);
+
+       /* order with ->result store in io_complete_rw_iopoll() */
+       smp_rmb();
 
        rb.to_free = rb.need_iter = 0;
        while (!list_empty(done)) {
                int cflags = 0;
 
                req = list_first_entry(done, struct io_kiocb, list);
+               if (READ_ONCE(req->result) == -EAGAIN) {
+                       req->iopoll_completed = 0;
+                       list_move_tail(&req->list, &again);
+                       continue;
+               }
                list_del(&req->list);
 
                if (req->flags & REQ_F_BUFFER_SELECTED)
@@ -1758,18 +1803,9 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
        if (ctx->flags & IORING_SETUP_SQPOLL)
                io_cqring_ev_posted(ctx);
        io_free_req_many(ctx, &rb);
-}
-
-static void io_iopoll_queue(struct list_head *again)
-{
-       struct io_kiocb *req;
 
-       do {
-               req = list_first_entry(again, struct io_kiocb, list);
-               list_del(&req->list);
-               refcount_inc(&req->refs);
-               io_queue_async_work(req);
-       } while (!list_empty(again));
+       if (!list_empty(&again))
+               io_iopoll_queue(&again);
 }
 
 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
@@ -1777,7 +1813,6 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
 {
        struct io_kiocb *req, *tmp;
        LIST_HEAD(done);
-       LIST_HEAD(again);
        bool spin;
        int ret;
 
@@ -1803,13 +1838,6 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
                if (!list_empty(&done))
                        break;
 
-               if (req->result == -EAGAIN) {
-                       list_move_tail(&req->list, &again);
-                       continue;
-               }
-               if (!list_empty(&again))
-                       break;
-
                ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
                if (ret < 0)
                        break;
@@ -1822,9 +1850,6 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
        if (!list_empty(&done))
                io_iopoll_complete(ctx, nr_events, &done);
 
-       if (!list_empty(&again))
-               io_iopoll_queue(&again);
-
        return ret;
 }
 
@@ -1973,11 +1998,13 @@ static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
        if (kiocb->ki_flags & IOCB_WRITE)
                kiocb_end_write(req);
 
-       if (res != req->result)
+       if (res != -EAGAIN && res != req->result)
                req_set_fail_links(req);
-       req->result = res;
-       if (res != -EAGAIN)
-               WRITE_ONCE(req->iopoll_completed, 1);
+
+       WRITE_ONCE(req->result, res);
+       /* order with io_poll_complete() checking ->result */
+       smp_wmb();
+       WRITE_ONCE(req->iopoll_completed, 1);
 }
 
 /*
@@ -2650,8 +2677,8 @@ copy_iov:
                }
        }
 out_free:
-       kfree(iovec);
-       req->flags &= ~REQ_F_NEED_CLEANUP;
+       if (!(req->flags & REQ_F_NEED_CLEANUP))
+               kfree(iovec);
        return ret;
 }
 
@@ -2773,8 +2800,8 @@ copy_iov:
                }
        }
 out_free:
-       req->flags &= ~REQ_F_NEED_CLEANUP;
-       kfree(iovec);
+       if (!(req->flags & REQ_F_NEED_CLEANUP))
+               kfree(iovec);
        return ret;
 }
 
@@ -4045,6 +4072,29 @@ struct io_poll_table {
        int error;
 };
 
+static int io_req_task_work_add(struct io_kiocb *req, struct callback_head *cb)
+{
+       struct task_struct *tsk = req->task;
+       struct io_ring_ctx *ctx = req->ctx;
+       int ret, notify = TWA_RESUME;
+
+       /*
+        * SQPOLL kernel thread doesn't need notification, just a wakeup.
+        * If we're not using an eventfd, then TWA_RESUME is always fine,
+        * as we won't have dependencies between request completions for
+        * other kernel wait conditions.
+        */
+       if (ctx->flags & IORING_SETUP_SQPOLL)
+               notify = 0;
+       else if (ctx->cq_ev_fd)
+               notify = TWA_SIGNAL;
+
+       ret = task_work_add(tsk, cb, notify);
+       if (!ret)
+               wake_up_process(tsk);
+       return ret;
+}
+
 static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
                           __poll_t mask, task_work_func_t func)
 {
@@ -4068,13 +4118,13 @@ static int __io_async_wake(struct io_kiocb *req, struct io_poll_iocb *poll,
         * of executing it. We can't safely execute it anyway, as we may not
         * have the needed state needed for it anyway.
         */
-       ret = task_work_add(tsk, &req->task_work, true);
+       ret = io_req_task_work_add(req, &req->task_work);
        if (unlikely(ret)) {
                WRITE_ONCE(poll->canceled, true);
                tsk = io_wq_get_task(req->ctx->io_wq);
-               task_work_add(tsk, &req->task_work, true);
+               task_work_add(tsk, &req->task_work, 0);
+               wake_up_process(tsk);
        }
-       wake_up_process(tsk);
        return 1;
 }
 
@@ -4236,6 +4286,28 @@ static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
        __io_queue_proc(&pt->req->apoll->poll, pt, head);
 }
 
+static void io_sq_thread_drop_mm(struct io_ring_ctx *ctx)
+{
+       struct mm_struct *mm = current->mm;
+
+       if (mm) {
+               kthread_unuse_mm(mm);
+               mmput(mm);
+       }
+}
+
+static int io_sq_thread_acquire_mm(struct io_ring_ctx *ctx,
+                                  struct io_kiocb *req)
+{
+       if (io_op_defs[req->opcode].needs_mm && !current->mm) {
+               if (unlikely(!mmget_not_zero(ctx->sqo_mm)))
+                       return -EFAULT;
+               kthread_use_mm(ctx->sqo_mm);
+       }
+
+       return 0;
+}
+
 static void io_async_task_func(struct callback_head *cb)
 {
        struct io_kiocb *req = container_of(cb, struct io_kiocb, task_work);
@@ -4270,11 +4342,16 @@ static void io_async_task_func(struct callback_head *cb)
 
        if (!canceled) {
                __set_current_state(TASK_RUNNING);
+               if (io_sq_thread_acquire_mm(ctx, req)) {
+                       io_cqring_add_event(req, -EFAULT);
+                       goto end_req;
+               }
                mutex_lock(&ctx->uring_lock);
                __io_queue_sqe(req, NULL);
                mutex_unlock(&ctx->uring_lock);
        } else {
                io_cqring_ev_posted(ctx);
+end_req:
                req_set_fail_links(req);
                io_double_put_req(req);
        }
@@ -4366,8 +4443,7 @@ static bool io_arm_poll_handler(struct io_kiocb *req)
                memcpy(&apoll->work, &req->work, sizeof(req->work));
        had_io = req->io != NULL;
 
-       get_task_struct(current);
-       req->task = current;
+       io_get_req_task(req);
        req->apoll = apoll;
        INIT_HLIST_NODE(&req->hash_node);
 
@@ -4555,8 +4631,7 @@ static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe
        events = READ_ONCE(sqe->poll_events);
        poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
 
-       get_task_struct(current);
-       req->task = current;
+       io_get_req_task(req);
        return 0;
 }
 
@@ -4772,7 +4847,7 @@ static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
        enum io_wq_cancel cancel_ret;
        int ret = 0;
 
-       cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
+       cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr, false);
        switch (cancel_ret) {
        case IO_WQ_CANCEL_OK:
                ret = 0;
@@ -5308,9 +5383,6 @@ static int io_issue_sqe(struct io_kiocb *req, const struct io_uring_sqe *sqe,
        if ((ctx->flags & IORING_SETUP_IOPOLL) && req->file) {
                const bool in_async = io_wq_current_is_worker();
 
-               if (req->result == -EAGAIN)
-                       return -EAGAIN;
-
                /* workqueue context doesn't hold uring_lock, grab it now */
                if (in_async)
                        mutex_lock(&ctx->uring_lock);
@@ -5817,17 +5889,14 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
        req->flags = 0;
        /* one is dropped after submission, the other at completion */
        refcount_set(&req->refs, 2);
-       req->task = NULL;
+       req->task = current;
        req->result = 0;
 
        if (unlikely(req->opcode >= IORING_OP_LAST))
                return -EINVAL;
 
-       if (io_op_defs[req->opcode].needs_mm && !current->mm) {
-               if (unlikely(!mmget_not_zero(ctx->sqo_mm)))
-                       return -EFAULT;
-               kthread_use_mm(ctx->sqo_mm);
-       }
+       if (unlikely(io_sq_thread_acquire_mm(ctx, req)))
+               return -EFAULT;
 
        sqe_flags = READ_ONCE(sqe->flags);
        /* enforce forwards compatibility on users */
@@ -5936,16 +6005,6 @@ fail_req:
        return submitted;
 }
 
-static inline void io_sq_thread_drop_mm(struct io_ring_ctx *ctx)
-{
-       struct mm_struct *mm = current->mm;
-
-       if (mm) {
-               kthread_unuse_mm(mm);
-               mmput(mm);
-       }
-}
-
 static int io_sq_thread(void *data)
 {
        struct io_ring_ctx *ctx = data;
@@ -5979,7 +6038,7 @@ static int io_sq_thread(void *data)
                 * If submit got -EBUSY, flag us as needing the application
                 * to enter the kernel to reap and flush events.
                 */
-               if (!to_submit || ret == -EBUSY) {
+               if (!to_submit || ret == -EBUSY || need_resched()) {
                        /*
                         * Drop cur_mm before scheduling, we can't hold it for
                         * long periods (or over schedule()). Do this before
@@ -5995,7 +6054,7 @@ static int io_sq_thread(void *data)
                         * more IO, we should wait for the application to
                         * reap events and wake us up.
                         */
-                       if (!list_empty(&ctx->poll_list) ||
+                       if (!list_empty(&ctx->poll_list) || need_resched() ||
                            (!time_after(jiffies, timeout) && ret != -EBUSY &&
                            !percpu_ref_is_dying(&ctx->refs))) {
                                if (current->task_works)
@@ -6146,15 +6205,23 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
        do {
                prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
                                                TASK_INTERRUPTIBLE);
+               /* make sure we run task_work before checking for signals */
                if (current->task_works)
                        task_work_run();
-               if (io_should_wake(&iowq, false))
-                       break;
-               schedule();
                if (signal_pending(current)) {
+                       if (current->jobctl & JOBCTL_TASK_WORK) {
+                               spin_lock_irq(&current->sighand->siglock);
+                               current->jobctl &= ~JOBCTL_TASK_WORK;
+                               recalc_sigpending();
+                               spin_unlock_irq(&current->sighand->siglock);
+                               continue;
+                       }
                        ret = -EINTR;
                        break;
                }
+               if (io_should_wake(&iowq, false))
+                       break;
+               schedule();
        } while (1);
        finish_wait(&ctx->wait, &iowq.wq);
 
@@ -7331,7 +7398,17 @@ static void io_ring_exit_work(struct work_struct *work)
        if (ctx->rings)
                io_cqring_overflow_flush(ctx, true);
 
-       wait_for_completion(&ctx->ref_comp);
+       /*
+        * If we're doing polled IO and end up having requests being
+        * submitted async (out-of-line), then completions can come in while
+        * we're waiting for refs to drop. We need to reap these manually,
+        * as nobody else will be looking for them.
+        */
+       while (!wait_for_completion_timeout(&ctx->ref_comp, HZ/20)) {
+               io_iopoll_reap_events(ctx);
+               if (ctx->rings)
+                       io_cqring_overflow_flush(ctx, true);
+       }
        io_ring_ctx_free(ctx);
 }
 
@@ -7365,9 +7442,22 @@ static int io_uring_release(struct inode *inode, struct file *file)
        return 0;
 }
 
+static bool io_wq_files_match(struct io_wq_work *work, void *data)
+{
+       struct files_struct *files = data;
+
+       return work->files == files;
+}
+
 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
                                  struct files_struct *files)
 {
+       if (list_empty_careful(&ctx->inflight_list))
+               return;
+
+       /* cancel all at once, should be faster than doing it one by one*/
+       io_wq_cancel_cb(ctx->io_wq, io_wq_files_match, files, true);
+
        while (!list_empty_careful(&ctx->inflight_list)) {
                struct io_kiocb *cancel_req = NULL, *req;
                DEFINE_WAIT(wait);
@@ -7423,6 +7513,14 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx,
        }
 }
 
+static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
+{
+       struct io_kiocb *req = container_of(work, struct io_kiocb, work);
+       struct task_struct *task = data;
+
+       return req->task == task;
+}
+
 static int io_uring_flush(struct file *file, void *data)
 {
        struct io_ring_ctx *ctx = file->private_data;
@@ -7433,7 +7531,7 @@ static int io_uring_flush(struct file *file, void *data)
         * If the task is going away, cancel work it may have pending
         */
        if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
-               io_wq_cancel_pid(ctx->io_wq, task_pid_vnr(current));
+               io_wq_cancel_cb(ctx->io_wq, io_cancel_task_cb, current, true);
 
        return 0;
 }