io-wq: only remove worker from free_list, if it was there
[linux-2.6-microblaze.git] / fs / io-wq.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Basic worker thread pool for io_uring
4  *
5  * Copyright (C) 2019 Jens Axboe
6  *
7  */
8 #include <linux/kernel.h>
9 #include <linux/init.h>
10 #include <linux/errno.h>
11 #include <linux/sched/signal.h>
12 #include <linux/mm.h>
13 #include <linux/sched/mm.h>
14 #include <linux/percpu.h>
15 #include <linux/slab.h>
16 #include <linux/rculist_nulls.h>
17 #include <linux/cpu.h>
18 #include <linux/tracehook.h>
19
20 #include "../kernel/sched/sched.h"
21 #include "io-wq.h"
22
23 #define WORKER_IDLE_TIMEOUT     (5 * HZ)
24
25 enum {
26         IO_WORKER_F_UP          = 1,    /* up and active */
27         IO_WORKER_F_RUNNING     = 2,    /* account as running */
28         IO_WORKER_F_FREE        = 4,    /* worker on free list */
29         IO_WORKER_F_FIXED       = 8,    /* static idle worker */
30         IO_WORKER_F_BOUND       = 16,   /* is doing bounded work */
31 };
32
33 enum {
34         IO_WQ_BIT_EXIT          = 0,    /* wq exiting */
35         IO_WQ_BIT_ERROR         = 1,    /* error on setup */
36 };
37
38 enum {
39         IO_WQE_FLAG_STALLED     = 1,    /* stalled on hash */
40 };
41
42 /*
43  * One for each thread in a wqe pool
44  */
45 struct io_worker {
46         refcount_t ref;
47         unsigned flags;
48         struct hlist_nulls_node nulls_node;
49         struct list_head all_list;
50         struct task_struct *task;
51         struct io_wqe *wqe;
52
53         struct io_wq_work *cur_work;
54         spinlock_t lock;
55
56         const struct cred *cur_creds;
57         const struct cred *saved_creds;
58
59         struct rcu_head rcu;
60 };
61
62 #if BITS_PER_LONG == 64
63 #define IO_WQ_HASH_ORDER        6
64 #else
65 #define IO_WQ_HASH_ORDER        5
66 #endif
67
68 #define IO_WQ_NR_HASH_BUCKETS   (1u << IO_WQ_HASH_ORDER)
69
70 struct io_wqe_acct {
71         unsigned nr_workers;
72         unsigned max_workers;
73         atomic_t nr_running;
74 };
75
76 enum {
77         IO_WQ_ACCT_BOUND,
78         IO_WQ_ACCT_UNBOUND,
79 };
80
81 /*
82  * Per-node worker thread pool
83  */
84 struct io_wqe {
85         struct {
86                 raw_spinlock_t lock;
87                 struct io_wq_work_list work_list;
88                 unsigned long hash_map;
89                 unsigned flags;
90         } ____cacheline_aligned_in_smp;
91
92         int node;
93         struct io_wqe_acct acct[2];
94
95         struct hlist_nulls_head free_list;
96         struct list_head all_list;
97
98         struct io_wq *wq;
99         struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
100 };
101
102 /*
103  * Per io_wq state
104   */
105 struct io_wq {
106         struct io_wqe **wqes;
107         unsigned long state;
108
109         free_work_fn *free_work;
110         io_wq_work_fn *do_work;
111
112         struct task_struct *manager;
113         struct user_struct *user;
114         refcount_t refs;
115         struct completion done;
116
117         struct hlist_node cpuhp_node;
118
119         pid_t task_pid;
120 };
121
122 static enum cpuhp_state io_wq_online;
123
124 static bool io_worker_get(struct io_worker *worker)
125 {
126         return refcount_inc_not_zero(&worker->ref);
127 }
128
129 static void io_worker_release(struct io_worker *worker)
130 {
131         if (refcount_dec_and_test(&worker->ref))
132                 wake_up_process(worker->task);
133 }
134
135 static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
136                                                    struct io_wq_work *work)
137 {
138         if (work->flags & IO_WQ_WORK_UNBOUND)
139                 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
140
141         return &wqe->acct[IO_WQ_ACCT_BOUND];
142 }
143
144 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
145 {
146         struct io_wqe *wqe = worker->wqe;
147
148         if (worker->flags & IO_WORKER_F_BOUND)
149                 return &wqe->acct[IO_WQ_ACCT_BOUND];
150
151         return &wqe->acct[IO_WQ_ACCT_UNBOUND];
152 }
153
154 static void io_worker_exit(struct io_worker *worker)
155 {
156         struct io_wqe *wqe = worker->wqe;
157         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
158         unsigned flags;
159
160         /*
161          * If we're not at zero, someone else is holding a brief reference
162          * to the worker. Wait for that to go away.
163          */
164         set_current_state(TASK_INTERRUPTIBLE);
165         if (!refcount_dec_and_test(&worker->ref))
166                 schedule();
167         __set_current_state(TASK_RUNNING);
168
169         preempt_disable();
170         current->flags &= ~PF_IO_WORKER;
171         flags = worker->flags;
172         worker->flags = 0;
173         if (flags & IO_WORKER_F_RUNNING)
174                 atomic_dec(&acct->nr_running);
175         if (!(flags & IO_WORKER_F_BOUND))
176                 atomic_dec(&wqe->wq->user->processes);
177         worker->flags = 0;
178         preempt_enable();
179
180         if (worker->saved_creds) {
181                 revert_creds(worker->saved_creds);
182                 worker->cur_creds = worker->saved_creds = NULL;
183         }
184
185         raw_spin_lock_irq(&wqe->lock);
186         if (flags & IO_WORKER_F_FREE)
187                 hlist_nulls_del_rcu(&worker->nulls_node);
188         list_del_rcu(&worker->all_list);
189         acct->nr_workers--;
190         raw_spin_unlock_irq(&wqe->lock);
191
192         kfree_rcu(worker, rcu);
193         if (refcount_dec_and_test(&wqe->wq->refs))
194                 complete(&wqe->wq->done);
195 }
196
197 static inline bool io_wqe_run_queue(struct io_wqe *wqe)
198         __must_hold(wqe->lock)
199 {
200         if (!wq_list_empty(&wqe->work_list) &&
201             !(wqe->flags & IO_WQE_FLAG_STALLED))
202                 return true;
203         return false;
204 }
205
206 /*
207  * Check head of free list for an available worker. If one isn't available,
208  * caller must wake up the wq manager to create one.
209  */
210 static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
211         __must_hold(RCU)
212 {
213         struct hlist_nulls_node *n;
214         struct io_worker *worker;
215
216         n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
217         if (is_a_nulls(n))
218                 return false;
219
220         worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
221         if (io_worker_get(worker)) {
222                 wake_up_process(worker->task);
223                 io_worker_release(worker);
224                 return true;
225         }
226
227         return false;
228 }
229
230 /*
231  * We need a worker. If we find a free one, we're good. If not, and we're
232  * below the max number of workers, wake up the manager to create one.
233  */
234 static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
235 {
236         bool ret;
237
238         /*
239          * Most likely an attempt to queue unbounded work on an io_wq that
240          * wasn't setup with any unbounded workers.
241          */
242         WARN_ON_ONCE(!acct->max_workers);
243
244         rcu_read_lock();
245         ret = io_wqe_activate_free_worker(wqe);
246         rcu_read_unlock();
247
248         if (!ret && acct->nr_workers < acct->max_workers)
249                 wake_up_process(wqe->wq->manager);
250 }
251
252 static void io_wqe_inc_running(struct io_worker *worker)
253 {
254         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
255
256         atomic_inc(&acct->nr_running);
257 }
258
259 static void io_wqe_dec_running(struct io_worker *worker)
260         __must_hold(wqe->lock)
261 {
262         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
263         struct io_wqe *wqe = worker->wqe;
264
265         if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
266                 io_wqe_wake_worker(wqe, acct);
267 }
268
269 static void io_worker_start(struct io_worker *worker)
270 {
271         worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
272         io_wqe_inc_running(worker);
273 }
274
275 /*
276  * Worker will start processing some work. Move it to the busy list, if
277  * it's currently on the freelist
278  */
279 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
280                              struct io_wq_work *work)
281         __must_hold(wqe->lock)
282 {
283         bool worker_bound, work_bound;
284
285         if (worker->flags & IO_WORKER_F_FREE) {
286                 worker->flags &= ~IO_WORKER_F_FREE;
287                 hlist_nulls_del_init_rcu(&worker->nulls_node);
288         }
289
290         /*
291          * If worker is moving from bound to unbound (or vice versa), then
292          * ensure we update the running accounting.
293          */
294         worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
295         work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
296         if (worker_bound != work_bound) {
297                 io_wqe_dec_running(worker);
298                 if (work_bound) {
299                         worker->flags |= IO_WORKER_F_BOUND;
300                         wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
301                         wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
302                         atomic_dec(&wqe->wq->user->processes);
303                 } else {
304                         worker->flags &= ~IO_WORKER_F_BOUND;
305                         wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
306                         wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
307                         atomic_inc(&wqe->wq->user->processes);
308                 }
309                 io_wqe_inc_running(worker);
310          }
311 }
312
313 /*
314  * No work, worker going to sleep. Move to freelist, and unuse mm if we
315  * have one attached. Dropping the mm may potentially sleep, so we drop
316  * the lock in that case and return success. Since the caller has to
317  * retry the loop in that case (we changed task state), we don't regrab
318  * the lock if we return success.
319  */
320 static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
321         __must_hold(wqe->lock)
322 {
323         if (!(worker->flags & IO_WORKER_F_FREE)) {
324                 worker->flags |= IO_WORKER_F_FREE;
325                 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
326         }
327         if (worker->saved_creds) {
328                 revert_creds(worker->saved_creds);
329                 worker->cur_creds = worker->saved_creds = NULL;
330         }
331 }
332
333 static inline unsigned int io_get_work_hash(struct io_wq_work *work)
334 {
335         return work->flags >> IO_WQ_HASH_SHIFT;
336 }
337
338 static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
339         __must_hold(wqe->lock)
340 {
341         struct io_wq_work_node *node, *prev;
342         struct io_wq_work *work, *tail;
343         unsigned int hash;
344
345         wq_list_for_each(node, prev, &wqe->work_list) {
346                 work = container_of(node, struct io_wq_work, list);
347
348                 /* not hashed, can run anytime */
349                 if (!io_wq_is_hashed(work)) {
350                         wq_list_del(&wqe->work_list, node, prev);
351                         return work;
352                 }
353
354                 /* hashed, can run if not already running */
355                 hash = io_get_work_hash(work);
356                 if (!(wqe->hash_map & BIT(hash))) {
357                         wqe->hash_map |= BIT(hash);
358                         /* all items with this hash lie in [work, tail] */
359                         tail = wqe->hash_tail[hash];
360                         wqe->hash_tail[hash] = NULL;
361                         wq_list_cut(&wqe->work_list, &tail->list, prev);
362                         return work;
363                 }
364         }
365
366         return NULL;
367 }
368
369 static void io_flush_signals(void)
370 {
371         if (unlikely(test_tsk_thread_flag(current, TIF_NOTIFY_SIGNAL))) {
372                 if (current->task_works)
373                         task_work_run();
374                 clear_tsk_thread_flag(current, TIF_NOTIFY_SIGNAL);
375         }
376 }
377
378 static void io_wq_switch_creds(struct io_worker *worker,
379                                struct io_wq_work *work)
380 {
381         const struct cred *old_creds = override_creds(work->creds);
382
383         worker->cur_creds = work->creds;
384         if (worker->saved_creds)
385                 put_cred(old_creds); /* creds set by previous switch */
386         else
387                 worker->saved_creds = old_creds;
388 }
389
390 static void io_assign_current_work(struct io_worker *worker,
391                                    struct io_wq_work *work)
392 {
393         if (work) {
394                 io_flush_signals();
395                 cond_resched();
396         }
397
398         spin_lock_irq(&worker->lock);
399         worker->cur_work = work;
400         spin_unlock_irq(&worker->lock);
401 }
402
403 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
404
405 static void io_worker_handle_work(struct io_worker *worker)
406         __releases(wqe->lock)
407 {
408         struct io_wqe *wqe = worker->wqe;
409         struct io_wq *wq = wqe->wq;
410
411         do {
412                 struct io_wq_work *work;
413 get_next:
414                 /*
415                  * If we got some work, mark us as busy. If we didn't, but
416                  * the list isn't empty, it means we stalled on hashed work.
417                  * Mark us stalled so we don't keep looking for work when we
418                  * can't make progress, any work completion or insertion will
419                  * clear the stalled flag.
420                  */
421                 work = io_get_next_work(wqe);
422                 if (work)
423                         __io_worker_busy(wqe, worker, work);
424                 else if (!wq_list_empty(&wqe->work_list))
425                         wqe->flags |= IO_WQE_FLAG_STALLED;
426
427                 raw_spin_unlock_irq(&wqe->lock);
428                 if (!work)
429                         break;
430                 io_assign_current_work(worker, work);
431
432                 /* handle a whole dependent link */
433                 do {
434                         struct io_wq_work *next_hashed, *linked;
435                         unsigned int hash = io_get_work_hash(work);
436
437                         next_hashed = wq_next_work(work);
438                         if (work->creds && worker->cur_creds != work->creds)
439                                 io_wq_switch_creds(worker, work);
440                         wq->do_work(work);
441                         io_assign_current_work(worker, NULL);
442
443                         linked = wq->free_work(work);
444                         work = next_hashed;
445                         if (!work && linked && !io_wq_is_hashed(linked)) {
446                                 work = linked;
447                                 linked = NULL;
448                         }
449                         io_assign_current_work(worker, work);
450                         if (linked)
451                                 io_wqe_enqueue(wqe, linked);
452
453                         if (hash != -1U && !next_hashed) {
454                                 raw_spin_lock_irq(&wqe->lock);
455                                 wqe->hash_map &= ~BIT_ULL(hash);
456                                 wqe->flags &= ~IO_WQE_FLAG_STALLED;
457                                 /* skip unnecessary unlock-lock wqe->lock */
458                                 if (!work)
459                                         goto get_next;
460                                 raw_spin_unlock_irq(&wqe->lock);
461                         }
462                 } while (work);
463
464                 raw_spin_lock_irq(&wqe->lock);
465         } while (1);
466 }
467
468 static int io_wqe_worker(void *data)
469 {
470         struct io_worker *worker = data;
471         struct io_wqe *wqe = worker->wqe;
472         struct io_wq *wq = wqe->wq;
473
474         io_worker_start(worker);
475
476         while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
477                 set_current_state(TASK_INTERRUPTIBLE);
478 loop:
479                 raw_spin_lock_irq(&wqe->lock);
480                 if (io_wqe_run_queue(wqe)) {
481                         __set_current_state(TASK_RUNNING);
482                         io_worker_handle_work(worker);
483                         goto loop;
484                 }
485                 __io_worker_idle(wqe, worker);
486                 raw_spin_unlock_irq(&wqe->lock);
487                 io_flush_signals();
488                 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
489                         continue;
490                 if (fatal_signal_pending(current))
491                         break;
492                 /* timed out, exit unless we're the fixed worker */
493                 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
494                     !(worker->flags & IO_WORKER_F_FIXED))
495                         break;
496         }
497
498         if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
499                 raw_spin_lock_irq(&wqe->lock);
500                 if (!wq_list_empty(&wqe->work_list))
501                         io_worker_handle_work(worker);
502                 else
503                         raw_spin_unlock_irq(&wqe->lock);
504         }
505
506         io_worker_exit(worker);
507         return 0;
508 }
509
510 /*
511  * Called when a worker is scheduled in. Mark us as currently running.
512  */
513 void io_wq_worker_running(struct task_struct *tsk)
514 {
515         struct io_worker *worker = tsk->pf_io_worker;
516
517         if (!worker)
518                 return;
519         if (!(worker->flags & IO_WORKER_F_UP))
520                 return;
521         if (worker->flags & IO_WORKER_F_RUNNING)
522                 return;
523         worker->flags |= IO_WORKER_F_RUNNING;
524         io_wqe_inc_running(worker);
525 }
526
527 /*
528  * Called when worker is going to sleep. If there are no workers currently
529  * running and we have work pending, wake up a free one or have the manager
530  * set one up.
531  */
532 void io_wq_worker_sleeping(struct task_struct *tsk)
533 {
534         struct io_worker *worker = tsk->pf_io_worker;
535
536         if (!worker)
537                 return;
538         if (!(worker->flags & IO_WORKER_F_UP))
539                 return;
540         if (!(worker->flags & IO_WORKER_F_RUNNING))
541                 return;
542
543         worker->flags &= ~IO_WORKER_F_RUNNING;
544
545         raw_spin_lock_irq(&worker->wqe->lock);
546         io_wqe_dec_running(worker);
547         raw_spin_unlock_irq(&worker->wqe->lock);
548 }
549
550 static int task_thread(void *data, int index)
551 {
552         struct io_worker *worker = data;
553         struct io_wqe *wqe = worker->wqe;
554         struct io_wqe_acct *acct = &wqe->acct[index];
555         struct io_wq *wq = wqe->wq;
556         char buf[TASK_COMM_LEN];
557
558         sprintf(buf, "iou-wrk-%d", wq->task_pid);
559         set_task_comm(current, buf);
560
561         current->pf_io_worker = worker;
562         worker->task = current;
563
564         set_cpus_allowed_ptr(current, cpumask_of_node(wqe->node));
565         current->flags |= PF_NO_SETAFFINITY;
566
567         raw_spin_lock_irq(&wqe->lock);
568         hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
569         list_add_tail_rcu(&worker->all_list, &wqe->all_list);
570         worker->flags |= IO_WORKER_F_FREE;
571         if (index == IO_WQ_ACCT_BOUND)
572                 worker->flags |= IO_WORKER_F_BOUND;
573         if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
574                 worker->flags |= IO_WORKER_F_FIXED;
575         acct->nr_workers++;
576         raw_spin_unlock_irq(&wqe->lock);
577
578         if (index == IO_WQ_ACCT_UNBOUND)
579                 atomic_inc(&wq->user->processes);
580
581         io_wqe_worker(data);
582         do_exit(0);
583 }
584
585 static int task_thread_bound(void *data)
586 {
587         return task_thread(data, IO_WQ_ACCT_BOUND);
588 }
589
590 static int task_thread_unbound(void *data)
591 {
592         return task_thread(data, IO_WQ_ACCT_UNBOUND);
593 }
594
595 static pid_t fork_thread(int (*fn)(void *), void *arg)
596 {
597         unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|
598                                 CLONE_IO|SIGCHLD;
599         struct kernel_clone_args args = {
600                 .flags          = ((lower_32_bits(flags) | CLONE_VM |
601                                     CLONE_UNTRACED) & ~CSIGNAL),
602                 .exit_signal    = (lower_32_bits(flags) & CSIGNAL),
603                 .stack          = (unsigned long)fn,
604                 .stack_size     = (unsigned long)arg,
605         };
606
607         return kernel_clone(&args);
608 }
609
610 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
611 {
612         struct io_worker *worker;
613         pid_t pid;
614
615         worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
616         if (!worker)
617                 return false;
618
619         refcount_set(&worker->ref, 1);
620         worker->nulls_node.pprev = NULL;
621         worker->wqe = wqe;
622         spin_lock_init(&worker->lock);
623
624         if (index == IO_WQ_ACCT_BOUND)
625                 pid = fork_thread(task_thread_bound, worker);
626         else
627                 pid = fork_thread(task_thread_unbound, worker);
628         if (pid < 0) {
629                 kfree(worker);
630                 return false;
631         }
632         refcount_inc(&wq->refs);
633         return true;
634 }
635
636 static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
637         __must_hold(wqe->lock)
638 {
639         struct io_wqe_acct *acct = &wqe->acct[index];
640
641         /* if we have available workers or no work, no need */
642         if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
643                 return false;
644         return acct->nr_workers < acct->max_workers;
645 }
646
647 /*
648  * Iterate the passed in list and call the specific function for each
649  * worker that isn't exiting
650  */
651 static bool io_wq_for_each_worker(struct io_wqe *wqe,
652                                   bool (*func)(struct io_worker *, void *),
653                                   void *data)
654 {
655         struct io_worker *worker;
656         bool ret = false;
657
658         list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
659                 if (io_worker_get(worker)) {
660                         /* no task if node is/was offline */
661                         if (worker->task)
662                                 ret = func(worker, data);
663                         io_worker_release(worker);
664                         if (ret)
665                                 break;
666                 }
667         }
668
669         return ret;
670 }
671
672 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
673 {
674         wake_up_process(worker->task);
675         return false;
676 }
677
678 /*
679  * Manager thread. Tasked with creating new workers, if we need them.
680  */
681 static int io_wq_manager(void *data)
682 {
683         struct io_wq *wq = data;
684         char buf[TASK_COMM_LEN];
685         int node;
686
687         sprintf(buf, "iou-mgr-%d", wq->task_pid);
688         set_task_comm(current, buf);
689         current->flags |= PF_IO_WORKER;
690         wq->manager = current;
691
692         complete(&wq->done);
693
694         while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
695                 for_each_node(node) {
696                         struct io_wqe *wqe = wq->wqes[node];
697                         bool fork_worker[2] = { false, false };
698
699                         if (!node_online(node))
700                                 continue;
701
702                         raw_spin_lock_irq(&wqe->lock);
703                         if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
704                                 fork_worker[IO_WQ_ACCT_BOUND] = true;
705                         if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
706                                 fork_worker[IO_WQ_ACCT_UNBOUND] = true;
707                         raw_spin_unlock_irq(&wqe->lock);
708                         if (fork_worker[IO_WQ_ACCT_BOUND])
709                                 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
710                         if (fork_worker[IO_WQ_ACCT_UNBOUND])
711                                 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
712                 }
713                 set_current_state(TASK_INTERRUPTIBLE);
714                 schedule_timeout(HZ);
715                 if (fatal_signal_pending(current))
716                         set_bit(IO_WQ_BIT_EXIT, &wq->state);
717         }
718
719         if (refcount_dec_and_test(&wq->refs)) {
720                 complete(&wq->done);
721                 do_exit(0);
722         }
723         /* if ERROR is set and we get here, we have workers to wake */
724         if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
725                 rcu_read_lock();
726                 for_each_node(node)
727                         io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
728                 rcu_read_unlock();
729         }
730         do_exit(0);
731 }
732
733 static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
734                             struct io_wq_work *work)
735 {
736         bool free_worker;
737
738         if (!(work->flags & IO_WQ_WORK_UNBOUND))
739                 return true;
740         if (atomic_read(&acct->nr_running))
741                 return true;
742
743         rcu_read_lock();
744         free_worker = !hlist_nulls_empty(&wqe->free_list);
745         rcu_read_unlock();
746         if (free_worker)
747                 return true;
748
749         if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
750             !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
751                 return false;
752
753         return true;
754 }
755
756 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
757 {
758         struct io_wq *wq = wqe->wq;
759
760         do {
761                 work->flags |= IO_WQ_WORK_CANCEL;
762                 wq->do_work(work);
763                 work = wq->free_work(work);
764         } while (work);
765 }
766
767 static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
768 {
769         unsigned int hash;
770         struct io_wq_work *tail;
771
772         if (!io_wq_is_hashed(work)) {
773 append:
774                 wq_list_add_tail(&work->list, &wqe->work_list);
775                 return;
776         }
777
778         hash = io_get_work_hash(work);
779         tail = wqe->hash_tail[hash];
780         wqe->hash_tail[hash] = work;
781         if (!tail)
782                 goto append;
783
784         wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
785 }
786
787 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
788 {
789         struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
790         int work_flags;
791         unsigned long flags;
792
793         /*
794          * Do early check to see if we need a new unbound worker, and if we do,
795          * if we're allowed to do so. This isn't 100% accurate as there's a
796          * gap between this check and incrementing the value, but that's OK.
797          * It's close enough to not be an issue, fork() has the same delay.
798          */
799         if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
800                 io_run_cancel(work, wqe);
801                 return;
802         }
803
804         work_flags = work->flags;
805         raw_spin_lock_irqsave(&wqe->lock, flags);
806         io_wqe_insert_work(wqe, work);
807         wqe->flags &= ~IO_WQE_FLAG_STALLED;
808         raw_spin_unlock_irqrestore(&wqe->lock, flags);
809
810         if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
811             !atomic_read(&acct->nr_running))
812                 io_wqe_wake_worker(wqe, acct);
813 }
814
815 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
816 {
817         struct io_wqe *wqe = wq->wqes[numa_node_id()];
818
819         io_wqe_enqueue(wqe, work);
820 }
821
822 /*
823  * Work items that hash to the same value will not be done in parallel.
824  * Used to limit concurrent writes, generally hashed by inode.
825  */
826 void io_wq_hash_work(struct io_wq_work *work, void *val)
827 {
828         unsigned int bit;
829
830         bit = hash_ptr(val, IO_WQ_HASH_ORDER);
831         work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
832 }
833
834 struct io_cb_cancel_data {
835         work_cancel_fn *fn;
836         void *data;
837         int nr_running;
838         int nr_pending;
839         bool cancel_all;
840 };
841
842 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
843 {
844         struct io_cb_cancel_data *match = data;
845         unsigned long flags;
846
847         /*
848          * Hold the lock to avoid ->cur_work going out of scope, caller
849          * may dereference the passed in work.
850          */
851         spin_lock_irqsave(&worker->lock, flags);
852         if (worker->cur_work &&
853             match->fn(worker->cur_work, match->data)) {
854                 set_notify_signal(worker->task);
855                 match->nr_running++;
856         }
857         spin_unlock_irqrestore(&worker->lock, flags);
858
859         return match->nr_running && !match->cancel_all;
860 }
861
862 static inline void io_wqe_remove_pending(struct io_wqe *wqe,
863                                          struct io_wq_work *work,
864                                          struct io_wq_work_node *prev)
865 {
866         unsigned int hash = io_get_work_hash(work);
867         struct io_wq_work *prev_work = NULL;
868
869         if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
870                 if (prev)
871                         prev_work = container_of(prev, struct io_wq_work, list);
872                 if (prev_work && io_get_work_hash(prev_work) == hash)
873                         wqe->hash_tail[hash] = prev_work;
874                 else
875                         wqe->hash_tail[hash] = NULL;
876         }
877         wq_list_del(&wqe->work_list, &work->list, prev);
878 }
879
880 static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
881                                        struct io_cb_cancel_data *match)
882 {
883         struct io_wq_work_node *node, *prev;
884         struct io_wq_work *work;
885         unsigned long flags;
886
887 retry:
888         raw_spin_lock_irqsave(&wqe->lock, flags);
889         wq_list_for_each(node, prev, &wqe->work_list) {
890                 work = container_of(node, struct io_wq_work, list);
891                 if (!match->fn(work, match->data))
892                         continue;
893                 io_wqe_remove_pending(wqe, work, prev);
894                 raw_spin_unlock_irqrestore(&wqe->lock, flags);
895                 io_run_cancel(work, wqe);
896                 match->nr_pending++;
897                 if (!match->cancel_all)
898                         return;
899
900                 /* not safe to continue after unlock */
901                 goto retry;
902         }
903         raw_spin_unlock_irqrestore(&wqe->lock, flags);
904 }
905
906 static void io_wqe_cancel_running_work(struct io_wqe *wqe,
907                                        struct io_cb_cancel_data *match)
908 {
909         rcu_read_lock();
910         io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
911         rcu_read_unlock();
912 }
913
914 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
915                                   void *data, bool cancel_all)
916 {
917         struct io_cb_cancel_data match = {
918                 .fn             = cancel,
919                 .data           = data,
920                 .cancel_all     = cancel_all,
921         };
922         int node;
923
924         /*
925          * First check pending list, if we're lucky we can just remove it
926          * from there. CANCEL_OK means that the work is returned as-new,
927          * no completion will be posted for it.
928          */
929         for_each_node(node) {
930                 struct io_wqe *wqe = wq->wqes[node];
931
932                 io_wqe_cancel_pending_work(wqe, &match);
933                 if (match.nr_pending && !match.cancel_all)
934                         return IO_WQ_CANCEL_OK;
935         }
936
937         /*
938          * Now check if a free (going busy) or busy worker has the work
939          * currently running. If we find it there, we'll return CANCEL_RUNNING
940          * as an indication that we attempt to signal cancellation. The
941          * completion will run normally in this case.
942          */
943         for_each_node(node) {
944                 struct io_wqe *wqe = wq->wqes[node];
945
946                 io_wqe_cancel_running_work(wqe, &match);
947                 if (match.nr_running && !match.cancel_all)
948                         return IO_WQ_CANCEL_RUNNING;
949         }
950
951         if (match.nr_running)
952                 return IO_WQ_CANCEL_RUNNING;
953         if (match.nr_pending)
954                 return IO_WQ_CANCEL_OK;
955         return IO_WQ_CANCEL_NOTFOUND;
956 }
957
958 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
959 {
960         int ret = -ENOMEM, node;
961         struct io_wq *wq;
962
963         if (WARN_ON_ONCE(!data->free_work || !data->do_work))
964                 return ERR_PTR(-EINVAL);
965
966         wq = kzalloc(sizeof(*wq), GFP_KERNEL);
967         if (!wq)
968                 return ERR_PTR(-ENOMEM);
969
970         wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
971         if (!wq->wqes)
972                 goto err_wq;
973
974         ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
975         if (ret)
976                 goto err_wqes;
977
978         wq->free_work = data->free_work;
979         wq->do_work = data->do_work;
980
981         /* caller must already hold a reference to this */
982         wq->user = data->user;
983
984         ret = -ENOMEM;
985         for_each_node(node) {
986                 struct io_wqe *wqe;
987                 int alloc_node = node;
988
989                 if (!node_online(alloc_node))
990                         alloc_node = NUMA_NO_NODE;
991                 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
992                 if (!wqe)
993                         goto err;
994                 wq->wqes[node] = wqe;
995                 wqe->node = alloc_node;
996                 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
997                 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
998                 if (wq->user) {
999                         wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1000                                         task_rlimit(current, RLIMIT_NPROC);
1001                 }
1002                 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
1003                 wqe->wq = wq;
1004                 raw_spin_lock_init(&wqe->lock);
1005                 INIT_WQ_LIST(&wqe->work_list);
1006                 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1007                 INIT_LIST_HEAD(&wqe->all_list);
1008         }
1009
1010         wq->task_pid = current->pid;
1011         init_completion(&wq->done);
1012         refcount_set(&wq->refs, 1);
1013
1014         current->flags |= PF_IO_WORKER;
1015         ret = fork_thread(io_wq_manager, wq);
1016         current->flags &= ~PF_IO_WORKER;
1017         if (ret >= 0) {
1018                 wait_for_completion(&wq->done);
1019                 reinit_completion(&wq->done);
1020                 return wq;
1021         }
1022
1023         if (refcount_dec_and_test(&wq->refs))
1024                 complete(&wq->done);
1025 err:
1026         cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1027         for_each_node(node)
1028                 kfree(wq->wqes[node]);
1029 err_wqes:
1030         kfree(wq->wqes);
1031 err_wq:
1032         kfree(wq);
1033         return ERR_PTR(ret);
1034 }
1035
1036 void io_wq_destroy(struct io_wq *wq)
1037 {
1038         int node;
1039
1040         cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1041
1042         set_bit(IO_WQ_BIT_EXIT, &wq->state);
1043         if (wq->manager)
1044                 wake_up_process(wq->manager);
1045
1046         rcu_read_lock();
1047         for_each_node(node)
1048                 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
1049         rcu_read_unlock();
1050
1051         wait_for_completion(&wq->done);
1052
1053         for_each_node(node)
1054                 kfree(wq->wqes[node]);
1055         kfree(wq->wqes);
1056         kfree(wq);
1057 }
1058
1059 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1060 {
1061         struct task_struct *task = worker->task;
1062         struct rq_flags rf;
1063         struct rq *rq;
1064
1065         rq = task_rq_lock(task, &rf);
1066         do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node));
1067         task->flags |= PF_NO_SETAFFINITY;
1068         task_rq_unlock(rq, task, &rf);
1069         return false;
1070 }
1071
1072 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1073 {
1074         struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1075         int i;
1076
1077         rcu_read_lock();
1078         for_each_node(i)
1079                 io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, NULL);
1080         rcu_read_unlock();
1081         return 0;
1082 }
1083
1084 static __init int io_wq_init(void)
1085 {
1086         int ret;
1087
1088         ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1089                                         io_wq_cpu_online, NULL);
1090         if (ret < 0)
1091                 return ret;
1092         io_wq_online = ret;
1093         return 0;
1094 }
1095 subsys_initcall(io_wq_init);