#include <linux/slab.h>
#include <linux/kthread.h>
#include <linux/rculist_nulls.h>
+#include <linux/fs_struct.h>
#include "io-wq.h"
const struct cred *cur_creds;
const struct cred *saved_creds;
struct files_struct *restore_files;
+ struct fs_struct *restore_fs;
};
#if BITS_PER_LONG == 64
task_unlock(current);
}
+ if (current->fs != worker->restore_fs)
+ current->fs = worker->restore_fs;
+
/*
* If we have an active mm, we need to drop the wq lock before unusing
* it. If we do, return true and let the caller retry the idle loop.
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
worker->restore_files = current->files;
+ worker->restore_fs = current->fs;
io_wqe_inc_running(wqe, worker);
}
current->files = work->files;
task_unlock(current);
}
+ if (work->fs && current->fs != work->fs)
+ current->fs = work->fs;
if (work->mm != worker->mm)
io_wq_switch_mm(worker, work);
if (worker->cur_creds != work->creds)
/* create fixed workers */
refcount_set(&wq->refs, workers_to_create);
for_each_node(node) {
+ if (!node_online(node))
+ continue;
if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
goto err;
workers_to_create--;
}
+ while (workers_to_create--)
+ refcount_dec(&wq->refs);
+
complete(&wq->done);
while (!kthread_should_stop()) {
struct io_wqe *wqe = wq->wqes[node];
bool fork_worker[2] = { false, false };
+ if (!node_online(node))
+ continue;
+
spin_lock_irq(&wqe->lock);
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
fork_worker[IO_WQ_ACCT_BOUND] = true;
list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
if (io_worker_get(worker)) {
- ret = func(worker, data);
+ /* no task if node is/was offline */
+ if (worker->task)
+ ret = func(worker, data);
io_worker_release(worker);
if (ret)
break;
return ret;
}
+struct work_match {
+ bool (*fn)(struct io_wq_work *, void *data);
+ void *data;
+};
+
static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
{
- struct io_wq_work *work = data;
+ struct work_match *match = data;
unsigned long flags;
bool ret = false;
- if (worker->cur_work != work)
- return false;
-
spin_lock_irqsave(&worker->lock, flags);
- if (worker->cur_work == work &&
+ if (match->fn(worker->cur_work, match->data) &&
!(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) {
send_sig(SIGINT, worker->task, 1);
ret = true;
}
static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
- struct io_wq_work *cwork)
+ struct work_match *match)
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work;
unsigned long flags;
bool found = false;
- cwork->flags |= IO_WQ_WORK_CANCEL;
-
/*
* First check pending list, if we're lucky we can just remove it
* from there. CANCEL_OK means that the work is returned as-new,
wq_list_for_each(node, prev, &wqe->work_list) {
work = container_of(node, struct io_wq_work, list);
- if (work == cwork) {
+ if (match->fn(work, match->data)) {
wq_node_del(&wqe->work_list, node, prev);
found = true;
break;
* completion will run normally in this case.
*/
rcu_read_lock();
- found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork);
+ found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
rcu_read_unlock();
return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
}
+static bool io_wq_work_match(struct io_wq_work *work, void *data)
+{
+ return work == data;
+}
+
enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
{
+ struct work_match match = {
+ .fn = io_wq_work_match,
+ .data = cwork
+ };
enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
int node;
+ cwork->flags |= IO_WQ_WORK_CANCEL;
+
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
- ret = io_wqe_cancel_work(wqe, cwork);
+ ret = io_wqe_cancel_work(wqe, &match);
+ if (ret != IO_WQ_CANCEL_NOTFOUND)
+ break;
+ }
+
+ return ret;
+}
+
+static bool io_wq_pid_match(struct io_wq_work *work, void *data)
+{
+ pid_t pid = (pid_t) (unsigned long) data;
+
+ if (work)
+ return work->task_pid == pid;
+ return false;
+}
+
+enum io_wq_cancel io_wq_cancel_pid(struct io_wq *wq, pid_t pid)
+{
+ struct work_match match = {
+ .fn = io_wq_pid_match,
+ .data = (void *) (unsigned long) pid
+ };
+ enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
+ int node;
+
+ for_each_node(node) {
+ struct io_wqe *wqe = wq->wqes[node];
+
+ ret = io_wqe_cancel_work(wqe, &match);
if (ret != IO_WQ_CANCEL_NOTFOUND)
break;
}
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
+ if (!node_online(node))
+ continue;
init_completion(&data.done);
INIT_IO_WORK(&data.work, io_wq_flush_func);
data.work.flags |= IO_WQ_WORK_INTERNAL;
for_each_node(node) {
struct io_wqe *wqe;
+ int alloc_node = node;
- wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node);
+ if (!node_online(alloc_node))
+ alloc_node = NUMA_NO_NODE;
+ wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
if (!wqe)
goto err;
wq->wqes[node] = wqe;
- wqe->node = node;
+ wqe->node = alloc_node;
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
if (wq->user) {
task_rlimit(current, RLIMIT_NPROC);
}
atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
- wqe->node = node;
wqe->wq = wq;
spin_lock_init(&wqe->lock);
INIT_WQ_LIST(&wqe->work_list);