sched: Prevent balance_push() on remote runqueues
[linux-2.6-microblaze.git] / fs / io-wq.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Basic worker thread pool for io_uring
4  *
5  * Copyright (C) 2019 Jens Axboe
6  *
7  */
8 #include <linux/kernel.h>
9 #include <linux/init.h>
10 #include <linux/errno.h>
11 #include <linux/sched/signal.h>
12 #include <linux/percpu.h>
13 #include <linux/slab.h>
14 #include <linux/rculist_nulls.h>
15 #include <linux/cpu.h>
16 #include <linux/tracehook.h>
17
18 #include "io-wq.h"
19
20 #define WORKER_IDLE_TIMEOUT     (5 * HZ)
21
22 enum {
23         IO_WORKER_F_UP          = 1,    /* up and active */
24         IO_WORKER_F_RUNNING     = 2,    /* account as running */
25         IO_WORKER_F_FREE        = 4,    /* worker on free list */
26         IO_WORKER_F_FIXED       = 8,    /* static idle worker */
27         IO_WORKER_F_BOUND       = 16,   /* is doing bounded work */
28 };
29
30 enum {
31         IO_WQ_BIT_EXIT          = 0,    /* wq exiting */
32 };
33
34 enum {
35         IO_WQE_FLAG_STALLED     = 1,    /* stalled on hash */
36 };
37
38 /*
39  * One for each thread in a wqe pool
40  */
41 struct io_worker {
42         refcount_t ref;
43         unsigned flags;
44         struct hlist_nulls_node nulls_node;
45         struct list_head all_list;
46         struct task_struct *task;
47         struct io_wqe *wqe;
48
49         struct io_wq_work *cur_work;
50         spinlock_t lock;
51
52         struct completion ref_done;
53
54         struct rcu_head rcu;
55 };
56
57 #if BITS_PER_LONG == 64
58 #define IO_WQ_HASH_ORDER        6
59 #else
60 #define IO_WQ_HASH_ORDER        5
61 #endif
62
63 #define IO_WQ_NR_HASH_BUCKETS   (1u << IO_WQ_HASH_ORDER)
64
65 struct io_wqe_acct {
66         unsigned nr_workers;
67         unsigned max_workers;
68         int index;
69         atomic_t nr_running;
70 };
71
72 enum {
73         IO_WQ_ACCT_BOUND,
74         IO_WQ_ACCT_UNBOUND,
75 };
76
77 /*
78  * Per-node worker thread pool
79  */
80 struct io_wqe {
81         struct {
82                 raw_spinlock_t lock;
83                 struct io_wq_work_list work_list;
84                 unsigned flags;
85         } ____cacheline_aligned_in_smp;
86
87         int node;
88         struct io_wqe_acct acct[2];
89
90         struct hlist_nulls_head free_list;
91         struct list_head all_list;
92
93         struct wait_queue_entry wait;
94
95         struct io_wq *wq;
96         struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
97
98         cpumask_var_t cpu_mask;
99 };
100
101 /*
102  * Per io_wq state
103   */
104 struct io_wq {
105         unsigned long state;
106
107         free_work_fn *free_work;
108         io_wq_work_fn *do_work;
109
110         struct io_wq_hash *hash;
111
112         atomic_t worker_refs;
113         struct completion worker_done;
114
115         struct hlist_node cpuhp_node;
116
117         struct task_struct *task;
118
119         struct io_wqe *wqes[];
120 };
121
122 static enum cpuhp_state io_wq_online;
123
124 struct io_cb_cancel_data {
125         work_cancel_fn *fn;
126         void *data;
127         int nr_running;
128         int nr_pending;
129         bool cancel_all;
130 };
131
132 static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first);
133 static void io_wqe_dec_running(struct io_worker *worker);
134
135 static bool io_worker_get(struct io_worker *worker)
136 {
137         return refcount_inc_not_zero(&worker->ref);
138 }
139
140 static void io_worker_release(struct io_worker *worker)
141 {
142         if (refcount_dec_and_test(&worker->ref))
143                 complete(&worker->ref_done);
144 }
145
146 static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
147 {
148         return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
149 }
150
151 static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
152                                                    struct io_wq_work *work)
153 {
154         return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
155 }
156
157 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
158 {
159         return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
160 }
161
162 static void io_worker_ref_put(struct io_wq *wq)
163 {
164         if (atomic_dec_and_test(&wq->worker_refs))
165                 complete(&wq->worker_done);
166 }
167
168 static void io_worker_exit(struct io_worker *worker)
169 {
170         struct io_wqe *wqe = worker->wqe;
171         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
172
173         if (refcount_dec_and_test(&worker->ref))
174                 complete(&worker->ref_done);
175         wait_for_completion(&worker->ref_done);
176
177         raw_spin_lock_irq(&wqe->lock);
178         if (worker->flags & IO_WORKER_F_FREE)
179                 hlist_nulls_del_rcu(&worker->nulls_node);
180         list_del_rcu(&worker->all_list);
181         acct->nr_workers--;
182         preempt_disable();
183         io_wqe_dec_running(worker);
184         worker->flags = 0;
185         current->flags &= ~PF_IO_WORKER;
186         preempt_enable();
187         raw_spin_unlock_irq(&wqe->lock);
188
189         kfree_rcu(worker, rcu);
190         io_worker_ref_put(wqe->wq);
191         do_exit(0);
192 }
193
194 static inline bool io_wqe_run_queue(struct io_wqe *wqe)
195         __must_hold(wqe->lock)
196 {
197         if (!wq_list_empty(&wqe->work_list) &&
198             !(wqe->flags & IO_WQE_FLAG_STALLED))
199                 return true;
200         return false;
201 }
202
203 /*
204  * Check head of free list for an available worker. If one isn't available,
205  * caller must create one.
206  */
207 static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
208         __must_hold(RCU)
209 {
210         struct hlist_nulls_node *n;
211         struct io_worker *worker;
212
213         /*
214          * Iterate free_list and see if we can find an idle worker to
215          * activate. If a given worker is on the free_list but in the process
216          * of exiting, keep trying.
217          */
218         hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
219                 if (!io_worker_get(worker))
220                         continue;
221                 if (wake_up_process(worker->task)) {
222                         io_worker_release(worker);
223                         return true;
224                 }
225                 io_worker_release(worker);
226         }
227
228         return false;
229 }
230
231 /*
232  * We need a worker. If we find a free one, we're good. If not, and we're
233  * below the max number of workers, create one.
234  */
235 static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
236 {
237         bool ret;
238
239         /*
240          * Most likely an attempt to queue unbounded work on an io_wq that
241          * wasn't setup with any unbounded workers.
242          */
243         if (unlikely(!acct->max_workers))
244                 pr_warn_once("io-wq is not configured for unbound workers");
245
246         rcu_read_lock();
247         ret = io_wqe_activate_free_worker(wqe);
248         rcu_read_unlock();
249
250         if (!ret) {
251                 bool do_create = false, first = false;
252
253                 raw_spin_lock_irq(&wqe->lock);
254                 if (acct->nr_workers < acct->max_workers) {
255                         atomic_inc(&acct->nr_running);
256                         atomic_inc(&wqe->wq->worker_refs);
257                         if (!acct->nr_workers)
258                                 first = true;
259                         acct->nr_workers++;
260                         do_create = true;
261                 }
262                 raw_spin_unlock_irq(&wqe->lock);
263                 if (do_create)
264                         create_io_worker(wqe->wq, wqe, acct->index, first);
265         }
266 }
267
268 static void io_wqe_inc_running(struct io_worker *worker)
269 {
270         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
271
272         atomic_inc(&acct->nr_running);
273 }
274
275 struct create_worker_data {
276         struct callback_head work;
277         struct io_wqe *wqe;
278         int index;
279 };
280
281 static void create_worker_cb(struct callback_head *cb)
282 {
283         struct create_worker_data *cwd;
284         struct io_wq *wq;
285         struct io_wqe *wqe;
286         struct io_wqe_acct *acct;
287         bool do_create = false, first = false;
288
289         cwd = container_of(cb, struct create_worker_data, work);
290         wqe = cwd->wqe;
291         wq = wqe->wq;
292         acct = &wqe->acct[cwd->index];
293         raw_spin_lock_irq(&wqe->lock);
294         if (acct->nr_workers < acct->max_workers) {
295                 if (!acct->nr_workers)
296                         first = true;
297                 acct->nr_workers++;
298                 do_create = true;
299         }
300         raw_spin_unlock_irq(&wqe->lock);
301         if (do_create) {
302                 create_io_worker(wq, wqe, cwd->index, first);
303         } else {
304                 atomic_dec(&acct->nr_running);
305                 io_worker_ref_put(wq);
306         }
307         kfree(cwd);
308 }
309
310 static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
311 {
312         struct create_worker_data *cwd;
313         struct io_wq *wq = wqe->wq;
314
315         /* raced with exit, just ignore create call */
316         if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
317                 goto fail;
318
319         cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
320         if (cwd) {
321                 init_task_work(&cwd->work, create_worker_cb);
322                 cwd->wqe = wqe;
323                 cwd->index = acct->index;
324                 if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
325                         return;
326
327                 kfree(cwd);
328         }
329 fail:
330         atomic_dec(&acct->nr_running);
331         io_worker_ref_put(wq);
332 }
333
334 static void io_wqe_dec_running(struct io_worker *worker)
335         __must_hold(wqe->lock)
336 {
337         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
338         struct io_wqe *wqe = worker->wqe;
339
340         if (!(worker->flags & IO_WORKER_F_UP))
341                 return;
342
343         if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
344                 atomic_inc(&acct->nr_running);
345                 atomic_inc(&wqe->wq->worker_refs);
346                 io_queue_worker_create(wqe, acct);
347         }
348 }
349
350 /*
351  * Worker will start processing some work. Move it to the busy list, if
352  * it's currently on the freelist
353  */
354 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
355                              struct io_wq_work *work)
356         __must_hold(wqe->lock)
357 {
358         bool worker_bound, work_bound;
359
360         BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);
361
362         if (worker->flags & IO_WORKER_F_FREE) {
363                 worker->flags &= ~IO_WORKER_F_FREE;
364                 hlist_nulls_del_init_rcu(&worker->nulls_node);
365         }
366
367         /*
368          * If worker is moving from bound to unbound (or vice versa), then
369          * ensure we update the running accounting.
370          */
371         worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
372         work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
373         if (worker_bound != work_bound) {
374                 int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
375                 io_wqe_dec_running(worker);
376                 worker->flags ^= IO_WORKER_F_BOUND;
377                 wqe->acct[index].nr_workers--;
378                 wqe->acct[index ^ 1].nr_workers++;
379                 io_wqe_inc_running(worker);
380          }
381 }
382
383 /*
384  * No work, worker going to sleep. Move to freelist, and unuse mm if we
385  * have one attached. Dropping the mm may potentially sleep, so we drop
386  * the lock in that case and return success. Since the caller has to
387  * retry the loop in that case (we changed task state), we don't regrab
388  * the lock if we return success.
389  */
390 static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
391         __must_hold(wqe->lock)
392 {
393         if (!(worker->flags & IO_WORKER_F_FREE)) {
394                 worker->flags |= IO_WORKER_F_FREE;
395                 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
396         }
397 }
398
399 static inline unsigned int io_get_work_hash(struct io_wq_work *work)
400 {
401         return work->flags >> IO_WQ_HASH_SHIFT;
402 }
403
404 static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
405 {
406         struct io_wq *wq = wqe->wq;
407
408         spin_lock(&wq->hash->wait.lock);
409         if (list_empty(&wqe->wait.entry)) {
410                 __add_wait_queue(&wq->hash->wait, &wqe->wait);
411                 if (!test_bit(hash, &wq->hash->map)) {
412                         __set_current_state(TASK_RUNNING);
413                         list_del_init(&wqe->wait.entry);
414                 }
415         }
416         spin_unlock(&wq->hash->wait.lock);
417 }
418
419 static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
420         __must_hold(wqe->lock)
421 {
422         struct io_wq_work_node *node, *prev;
423         struct io_wq_work *work, *tail;
424         unsigned int stall_hash = -1U;
425
426         wq_list_for_each(node, prev, &wqe->work_list) {
427                 unsigned int hash;
428
429                 work = container_of(node, struct io_wq_work, list);
430
431                 /* not hashed, can run anytime */
432                 if (!io_wq_is_hashed(work)) {
433                         wq_list_del(&wqe->work_list, node, prev);
434                         return work;
435                 }
436
437                 hash = io_get_work_hash(work);
438                 /* all items with this hash lie in [work, tail] */
439                 tail = wqe->hash_tail[hash];
440
441                 /* hashed, can run if not already running */
442                 if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
443                         wqe->hash_tail[hash] = NULL;
444                         wq_list_cut(&wqe->work_list, &tail->list, prev);
445                         return work;
446                 }
447                 if (stall_hash == -1U)
448                         stall_hash = hash;
449                 /* fast forward to a next hash, for-each will fix up @prev */
450                 node = &tail->list;
451         }
452
453         if (stall_hash != -1U) {
454                 raw_spin_unlock(&wqe->lock);
455                 io_wait_on_hash(wqe, stall_hash);
456                 raw_spin_lock(&wqe->lock);
457         }
458
459         return NULL;
460 }
461
462 static bool io_flush_signals(void)
463 {
464         if (unlikely(test_thread_flag(TIF_NOTIFY_SIGNAL))) {
465                 __set_current_state(TASK_RUNNING);
466                 tracehook_notify_signal();
467                 return true;
468         }
469         return false;
470 }
471
472 static void io_assign_current_work(struct io_worker *worker,
473                                    struct io_wq_work *work)
474 {
475         if (work) {
476                 io_flush_signals();
477                 cond_resched();
478         }
479
480         spin_lock_irq(&worker->lock);
481         worker->cur_work = work;
482         spin_unlock_irq(&worker->lock);
483 }
484
485 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
486
487 static void io_worker_handle_work(struct io_worker *worker)
488         __releases(wqe->lock)
489 {
490         struct io_wqe *wqe = worker->wqe;
491         struct io_wq *wq = wqe->wq;
492         bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
493
494         do {
495                 struct io_wq_work *work;
496 get_next:
497                 /*
498                  * If we got some work, mark us as busy. If we didn't, but
499                  * the list isn't empty, it means we stalled on hashed work.
500                  * Mark us stalled so we don't keep looking for work when we
501                  * can't make progress, any work completion or insertion will
502                  * clear the stalled flag.
503                  */
504                 work = io_get_next_work(wqe);
505                 if (work)
506                         __io_worker_busy(wqe, worker, work);
507                 else if (!wq_list_empty(&wqe->work_list))
508                         wqe->flags |= IO_WQE_FLAG_STALLED;
509
510                 raw_spin_unlock_irq(&wqe->lock);
511                 if (!work)
512                         break;
513                 io_assign_current_work(worker, work);
514                 __set_current_state(TASK_RUNNING);
515
516                 /* handle a whole dependent link */
517                 do {
518                         struct io_wq_work *next_hashed, *linked;
519                         unsigned int hash = io_get_work_hash(work);
520
521                         next_hashed = wq_next_work(work);
522
523                         if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
524                                 work->flags |= IO_WQ_WORK_CANCEL;
525                         wq->do_work(work);
526                         io_assign_current_work(worker, NULL);
527
528                         linked = wq->free_work(work);
529                         work = next_hashed;
530                         if (!work && linked && !io_wq_is_hashed(linked)) {
531                                 work = linked;
532                                 linked = NULL;
533                         }
534                         io_assign_current_work(worker, work);
535                         if (linked)
536                                 io_wqe_enqueue(wqe, linked);
537
538                         if (hash != -1U && !next_hashed) {
539                                 clear_bit(hash, &wq->hash->map);
540                                 if (wq_has_sleeper(&wq->hash->wait))
541                                         wake_up(&wq->hash->wait);
542                                 raw_spin_lock_irq(&wqe->lock);
543                                 wqe->flags &= ~IO_WQE_FLAG_STALLED;
544                                 /* skip unnecessary unlock-lock wqe->lock */
545                                 if (!work)
546                                         goto get_next;
547                                 raw_spin_unlock_irq(&wqe->lock);
548                         }
549                 } while (work);
550
551                 raw_spin_lock_irq(&wqe->lock);
552         } while (1);
553 }
554
555 static int io_wqe_worker(void *data)
556 {
557         struct io_worker *worker = data;
558         struct io_wqe *wqe = worker->wqe;
559         struct io_wq *wq = wqe->wq;
560         char buf[TASK_COMM_LEN];
561
562         worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
563
564         snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
565         set_task_comm(current, buf);
566
567         while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
568                 long ret;
569
570                 set_current_state(TASK_INTERRUPTIBLE);
571 loop:
572                 raw_spin_lock_irq(&wqe->lock);
573                 if (io_wqe_run_queue(wqe)) {
574                         io_worker_handle_work(worker);
575                         goto loop;
576                 }
577                 __io_worker_idle(wqe, worker);
578                 raw_spin_unlock_irq(&wqe->lock);
579                 if (io_flush_signals())
580                         continue;
581                 ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
582                 if (signal_pending(current)) {
583                         struct ksignal ksig;
584
585                         if (!get_signal(&ksig))
586                                 continue;
587                         break;
588                 }
589                 if (ret)
590                         continue;
591                 /* timed out, exit unless we're the fixed worker */
592                 if (!(worker->flags & IO_WORKER_F_FIXED))
593                         break;
594         }
595
596         if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
597                 raw_spin_lock_irq(&wqe->lock);
598                 io_worker_handle_work(worker);
599         }
600
601         io_worker_exit(worker);
602         return 0;
603 }
604
605 /*
606  * Called when a worker is scheduled in. Mark us as currently running.
607  */
608 void io_wq_worker_running(struct task_struct *tsk)
609 {
610         struct io_worker *worker = tsk->pf_io_worker;
611
612         if (!worker)
613                 return;
614         if (!(worker->flags & IO_WORKER_F_UP))
615                 return;
616         if (worker->flags & IO_WORKER_F_RUNNING)
617                 return;
618         worker->flags |= IO_WORKER_F_RUNNING;
619         io_wqe_inc_running(worker);
620 }
621
622 /*
623  * Called when worker is going to sleep. If there are no workers currently
624  * running and we have work pending, wake up a free one or create a new one.
625  */
626 void io_wq_worker_sleeping(struct task_struct *tsk)
627 {
628         struct io_worker *worker = tsk->pf_io_worker;
629
630         if (!worker)
631                 return;
632         if (!(worker->flags & IO_WORKER_F_UP))
633                 return;
634         if (!(worker->flags & IO_WORKER_F_RUNNING))
635                 return;
636
637         worker->flags &= ~IO_WORKER_F_RUNNING;
638
639         raw_spin_lock_irq(&worker->wqe->lock);
640         io_wqe_dec_running(worker);
641         raw_spin_unlock_irq(&worker->wqe->lock);
642 }
643
644 static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first)
645 {
646         struct io_wqe_acct *acct = &wqe->acct[index];
647         struct io_worker *worker;
648         struct task_struct *tsk;
649
650         __set_current_state(TASK_RUNNING);
651
652         worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
653         if (!worker)
654                 goto fail;
655
656         refcount_set(&worker->ref, 1);
657         worker->nulls_node.pprev = NULL;
658         worker->wqe = wqe;
659         spin_lock_init(&worker->lock);
660         init_completion(&worker->ref_done);
661
662         tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
663         if (IS_ERR(tsk)) {
664                 kfree(worker);
665 fail:
666                 atomic_dec(&acct->nr_running);
667                 raw_spin_lock_irq(&wqe->lock);
668                 acct->nr_workers--;
669                 raw_spin_unlock_irq(&wqe->lock);
670                 io_worker_ref_put(wq);
671                 return;
672         }
673
674         tsk->pf_io_worker = worker;
675         worker->task = tsk;
676         set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
677         tsk->flags |= PF_NO_SETAFFINITY;
678
679         raw_spin_lock_irq(&wqe->lock);
680         hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
681         list_add_tail_rcu(&worker->all_list, &wqe->all_list);
682         worker->flags |= IO_WORKER_F_FREE;
683         if (index == IO_WQ_ACCT_BOUND)
684                 worker->flags |= IO_WORKER_F_BOUND;
685         if (first && (worker->flags & IO_WORKER_F_BOUND))
686                 worker->flags |= IO_WORKER_F_FIXED;
687         raw_spin_unlock_irq(&wqe->lock);
688         wake_up_new_task(tsk);
689 }
690
691 /*
692  * Iterate the passed in list and call the specific function for each
693  * worker that isn't exiting
694  */
695 static bool io_wq_for_each_worker(struct io_wqe *wqe,
696                                   bool (*func)(struct io_worker *, void *),
697                                   void *data)
698 {
699         struct io_worker *worker;
700         bool ret = false;
701
702         list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
703                 if (io_worker_get(worker)) {
704                         /* no task if node is/was offline */
705                         if (worker->task)
706                                 ret = func(worker, data);
707                         io_worker_release(worker);
708                         if (ret)
709                                 break;
710                 }
711         }
712
713         return ret;
714 }
715
716 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
717 {
718         set_notify_signal(worker->task);
719         wake_up_process(worker->task);
720         return false;
721 }
722
723 static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
724 {
725         return true;
726 }
727
728 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
729 {
730         struct io_wq *wq = wqe->wq;
731
732         do {
733                 work->flags |= IO_WQ_WORK_CANCEL;
734                 wq->do_work(work);
735                 work = wq->free_work(work);
736         } while (work);
737 }
738
739 static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
740 {
741         unsigned int hash;
742         struct io_wq_work *tail;
743
744         if (!io_wq_is_hashed(work)) {
745 append:
746                 wq_list_add_tail(&work->list, &wqe->work_list);
747                 return;
748         }
749
750         hash = io_get_work_hash(work);
751         tail = wqe->hash_tail[hash];
752         wqe->hash_tail[hash] = work;
753         if (!tail)
754                 goto append;
755
756         wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
757 }
758
759 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
760 {
761         struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
762         int work_flags;
763         unsigned long flags;
764
765         /*
766          * If io-wq is exiting for this task, or if the request has explicitly
767          * been marked as one that should not get executed, cancel it here.
768          */
769         if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
770             (work->flags & IO_WQ_WORK_CANCEL)) {
771                 io_run_cancel(work, wqe);
772                 return;
773         }
774
775         work_flags = work->flags;
776         raw_spin_lock_irqsave(&wqe->lock, flags);
777         io_wqe_insert_work(wqe, work);
778         wqe->flags &= ~IO_WQE_FLAG_STALLED;
779         raw_spin_unlock_irqrestore(&wqe->lock, flags);
780
781         if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
782             !atomic_read(&acct->nr_running))
783                 io_wqe_wake_worker(wqe, acct);
784 }
785
786 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
787 {
788         struct io_wqe *wqe = wq->wqes[numa_node_id()];
789
790         io_wqe_enqueue(wqe, work);
791 }
792
793 /*
794  * Work items that hash to the same value will not be done in parallel.
795  * Used to limit concurrent writes, generally hashed by inode.
796  */
797 void io_wq_hash_work(struct io_wq_work *work, void *val)
798 {
799         unsigned int bit;
800
801         bit = hash_ptr(val, IO_WQ_HASH_ORDER);
802         work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
803 }
804
805 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
806 {
807         struct io_cb_cancel_data *match = data;
808         unsigned long flags;
809
810         /*
811          * Hold the lock to avoid ->cur_work going out of scope, caller
812          * may dereference the passed in work.
813          */
814         spin_lock_irqsave(&worker->lock, flags);
815         if (worker->cur_work &&
816             match->fn(worker->cur_work, match->data)) {
817                 set_notify_signal(worker->task);
818                 match->nr_running++;
819         }
820         spin_unlock_irqrestore(&worker->lock, flags);
821
822         return match->nr_running && !match->cancel_all;
823 }
824
825 static inline void io_wqe_remove_pending(struct io_wqe *wqe,
826                                          struct io_wq_work *work,
827                                          struct io_wq_work_node *prev)
828 {
829         unsigned int hash = io_get_work_hash(work);
830         struct io_wq_work *prev_work = NULL;
831
832         if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
833                 if (prev)
834                         prev_work = container_of(prev, struct io_wq_work, list);
835                 if (prev_work && io_get_work_hash(prev_work) == hash)
836                         wqe->hash_tail[hash] = prev_work;
837                 else
838                         wqe->hash_tail[hash] = NULL;
839         }
840         wq_list_del(&wqe->work_list, &work->list, prev);
841 }
842
843 static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
844                                        struct io_cb_cancel_data *match)
845 {
846         struct io_wq_work_node *node, *prev;
847         struct io_wq_work *work;
848         unsigned long flags;
849
850 retry:
851         raw_spin_lock_irqsave(&wqe->lock, flags);
852         wq_list_for_each(node, prev, &wqe->work_list) {
853                 work = container_of(node, struct io_wq_work, list);
854                 if (!match->fn(work, match->data))
855                         continue;
856                 io_wqe_remove_pending(wqe, work, prev);
857                 raw_spin_unlock_irqrestore(&wqe->lock, flags);
858                 io_run_cancel(work, wqe);
859                 match->nr_pending++;
860                 if (!match->cancel_all)
861                         return;
862
863                 /* not safe to continue after unlock */
864                 goto retry;
865         }
866         raw_spin_unlock_irqrestore(&wqe->lock, flags);
867 }
868
869 static void io_wqe_cancel_running_work(struct io_wqe *wqe,
870                                        struct io_cb_cancel_data *match)
871 {
872         rcu_read_lock();
873         io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
874         rcu_read_unlock();
875 }
876
877 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
878                                   void *data, bool cancel_all)
879 {
880         struct io_cb_cancel_data match = {
881                 .fn             = cancel,
882                 .data           = data,
883                 .cancel_all     = cancel_all,
884         };
885         int node;
886
887         /*
888          * First check pending list, if we're lucky we can just remove it
889          * from there. CANCEL_OK means that the work is returned as-new,
890          * no completion will be posted for it.
891          */
892         for_each_node(node) {
893                 struct io_wqe *wqe = wq->wqes[node];
894
895                 io_wqe_cancel_pending_work(wqe, &match);
896                 if (match.nr_pending && !match.cancel_all)
897                         return IO_WQ_CANCEL_OK;
898         }
899
900         /*
901          * Now check if a free (going busy) or busy worker has the work
902          * currently running. If we find it there, we'll return CANCEL_RUNNING
903          * as an indication that we attempt to signal cancellation. The
904          * completion will run normally in this case.
905          */
906         for_each_node(node) {
907                 struct io_wqe *wqe = wq->wqes[node];
908
909                 io_wqe_cancel_running_work(wqe, &match);
910                 if (match.nr_running && !match.cancel_all)
911                         return IO_WQ_CANCEL_RUNNING;
912         }
913
914         if (match.nr_running)
915                 return IO_WQ_CANCEL_RUNNING;
916         if (match.nr_pending)
917                 return IO_WQ_CANCEL_OK;
918         return IO_WQ_CANCEL_NOTFOUND;
919 }
920
921 static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
922                             int sync, void *key)
923 {
924         struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
925
926         list_del_init(&wait->entry);
927
928         rcu_read_lock();
929         io_wqe_activate_free_worker(wqe);
930         rcu_read_unlock();
931         return 1;
932 }
933
934 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
935 {
936         int ret, node;
937         struct io_wq *wq;
938
939         if (WARN_ON_ONCE(!data->free_work || !data->do_work))
940                 return ERR_PTR(-EINVAL);
941         if (WARN_ON_ONCE(!bounded))
942                 return ERR_PTR(-EINVAL);
943
944         wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
945         if (!wq)
946                 return ERR_PTR(-ENOMEM);
947         ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
948         if (ret)
949                 goto err_wq;
950
951         refcount_inc(&data->hash->refs);
952         wq->hash = data->hash;
953         wq->free_work = data->free_work;
954         wq->do_work = data->do_work;
955
956         ret = -ENOMEM;
957         for_each_node(node) {
958                 struct io_wqe *wqe;
959                 int alloc_node = node;
960
961                 if (!node_online(alloc_node))
962                         alloc_node = NUMA_NO_NODE;
963                 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
964                 if (!wqe)
965                         goto err;
966                 if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
967                         goto err;
968                 cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
969                 wq->wqes[node] = wqe;
970                 wqe->node = alloc_node;
971                 wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
972                 wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
973                 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
974                 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
975                 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
976                                         task_rlimit(current, RLIMIT_NPROC);
977                 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
978                 wqe->wait.func = io_wqe_hash_wake;
979                 INIT_LIST_HEAD(&wqe->wait.entry);
980                 wqe->wq = wq;
981                 raw_spin_lock_init(&wqe->lock);
982                 INIT_WQ_LIST(&wqe->work_list);
983                 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
984                 INIT_LIST_HEAD(&wqe->all_list);
985         }
986
987         wq->task = get_task_struct(data->task);
988         atomic_set(&wq->worker_refs, 1);
989         init_completion(&wq->worker_done);
990         return wq;
991 err:
992         io_wq_put_hash(data->hash);
993         cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
994         for_each_node(node) {
995                 if (!wq->wqes[node])
996                         continue;
997                 free_cpumask_var(wq->wqes[node]->cpu_mask);
998                 kfree(wq->wqes[node]);
999         }
1000 err_wq:
1001         kfree(wq);
1002         return ERR_PTR(ret);
1003 }
1004
1005 static bool io_task_work_match(struct callback_head *cb, void *data)
1006 {
1007         struct create_worker_data *cwd;
1008
1009         if (cb->func != create_worker_cb)
1010                 return false;
1011         cwd = container_of(cb, struct create_worker_data, work);
1012         return cwd->wqe->wq == data;
1013 }
1014
1015 void io_wq_exit_start(struct io_wq *wq)
1016 {
1017         set_bit(IO_WQ_BIT_EXIT, &wq->state);
1018 }
1019
1020 static void io_wq_exit_workers(struct io_wq *wq)
1021 {
1022         struct callback_head *cb;
1023         int node;
1024
1025         if (!wq->task)
1026                 return;
1027
1028         while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1029                 struct create_worker_data *cwd;
1030
1031                 cwd = container_of(cb, struct create_worker_data, work);
1032                 atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
1033                 io_worker_ref_put(wq);
1034                 kfree(cwd);
1035         }
1036
1037         rcu_read_lock();
1038         for_each_node(node) {
1039                 struct io_wqe *wqe = wq->wqes[node];
1040
1041                 io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1042         }
1043         rcu_read_unlock();
1044         io_worker_ref_put(wq);
1045         wait_for_completion(&wq->worker_done);
1046
1047         for_each_node(node) {
1048                 spin_lock_irq(&wq->hash->wait.lock);
1049                 list_del_init(&wq->wqes[node]->wait.entry);
1050                 spin_unlock_irq(&wq->hash->wait.lock);
1051         }
1052         put_task_struct(wq->task);
1053         wq->task = NULL;
1054 }
1055
1056 static void io_wq_destroy(struct io_wq *wq)
1057 {
1058         int node;
1059
1060         cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1061
1062         for_each_node(node) {
1063                 struct io_wqe *wqe = wq->wqes[node];
1064                 struct io_cb_cancel_data match = {
1065                         .fn             = io_wq_work_match_all,
1066                         .cancel_all     = true,
1067                 };
1068                 io_wqe_cancel_pending_work(wqe, &match);
1069                 free_cpumask_var(wqe->cpu_mask);
1070                 kfree(wqe);
1071         }
1072         io_wq_put_hash(wq->hash);
1073         kfree(wq);
1074 }
1075
1076 void io_wq_put_and_exit(struct io_wq *wq)
1077 {
1078         WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1079
1080         io_wq_exit_workers(wq);
1081         io_wq_destroy(wq);
1082 }
1083
1084 struct online_data {
1085         unsigned int cpu;
1086         bool online;
1087 };
1088
1089 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1090 {
1091         struct online_data *od = data;
1092
1093         if (od->online)
1094                 cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
1095         else
1096                 cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1097         return false;
1098 }
1099
1100 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1101 {
1102         struct online_data od = {
1103                 .cpu = cpu,
1104                 .online = online
1105         };
1106         int i;
1107
1108         rcu_read_lock();
1109         for_each_node(i)
1110                 io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1111         rcu_read_unlock();
1112         return 0;
1113 }
1114
1115 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1116 {
1117         struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1118
1119         return __io_wq_cpu_online(wq, cpu, true);
1120 }
1121
1122 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1123 {
1124         struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1125
1126         return __io_wq_cpu_online(wq, cpu, false);
1127 }
1128
1129 int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
1130 {
1131         int i;
1132
1133         rcu_read_lock();
1134         for_each_node(i) {
1135                 struct io_wqe *wqe = wq->wqes[i];
1136
1137                 if (mask)
1138                         cpumask_copy(wqe->cpu_mask, mask);
1139                 else
1140                         cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
1141         }
1142         rcu_read_unlock();
1143         return 0;
1144 }
1145
1146 static __init int io_wq_init(void)
1147 {
1148         int ret;
1149
1150         ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1151                                         io_wq_cpu_online, io_wq_cpu_offline);
1152         if (ret < 0)
1153                 return ret;
1154         io_wq_online = ret;
1155         return 0;
1156 }
1157 subsys_initcall(io_wq_init);