workqueue: Enable unbound cpumask update on ordered workqueues
[linux-2.6-microblaze.git] / kernel / workqueue.c
index ee6aa1b..da12485 100644 (file)
@@ -29,6 +29,7 @@
 #include <linux/kernel.h>
 #include <linux/sched.h>
 #include <linux/init.h>
+#include <linux/interrupt.h>
 #include <linux/signal.h>
 #include <linux/completion.h>
 #include <linux/workqueue.h>
@@ -56,7 +57,7 @@
 
 #include "workqueue_internal.h"
 
-enum {
+enum worker_pool_flags {
        /*
         * worker_pool flags
         *
@@ -72,10 +73,16 @@ enum {
         * Note that DISASSOCIATED should be flipped only while holding
         * wq_pool_attach_mutex to avoid changing binding state while
         * worker_attach_to_pool() is in progress.
+        *
+        * As there can only be one concurrent BH execution context per CPU, a
+        * BH pool is per-CPU and always DISASSOCIATED.
         */
-       POOL_MANAGER_ACTIVE     = 1 << 0,       /* being managed */
+       POOL_BH                 = 1 << 0,       /* is a BH pool */
+       POOL_MANAGER_ACTIVE     = 1 << 1,       /* being managed */
        POOL_DISASSOCIATED      = 1 << 2,       /* cpu can't serve workers */
+};
 
+enum worker_flags {
        /* worker flags */
        WORKER_DIE              = 1 << 1,       /* die die die */
        WORKER_IDLE             = 1 << 2,       /* is idle */
@@ -86,7 +93,9 @@ enum {
 
        WORKER_NOT_RUNNING      = WORKER_PREP | WORKER_CPU_INTENSIVE |
                                  WORKER_UNBOUND | WORKER_REBOUND,
+};
 
+enum wq_internal_consts {
        NR_STD_WORKER_POOLS     = 2,            /* # standard pools per cpu */
 
        UNBOUND_POOL_HASH_ORDER = 6,            /* hashed by pool->attrs */
@@ -111,6 +120,14 @@ enum {
        WQ_NAME_LEN             = 32,
 };
 
+/*
+ * We don't want to trap softirq for too long. See MAX_SOFTIRQ_TIME and
+ * MAX_SOFTIRQ_RESTART in kernel/softirq.c. These are macros because
+ * msecs_to_jiffies() can't be an initializer.
+ */
+#define BH_WORKER_JIFFIES      msecs_to_jiffies(2)
+#define BH_WORKER_RESTARTS     10
+
 /*
  * Structure fields follow one of the following exclusion rules.
  *
@@ -122,6 +139,9 @@ enum {
  *
  * L: pool->lock protected.  Access with pool->lock held.
  *
+ * LN: pool->lock and wq_node_nr_active->lock protected for writes. Either for
+ *     reads.
+ *
  * K: Only modified by worker while holding pool->lock. Can be safely read by
  *    self, while holding pool->lock or from IRQ context if %current is the
  *    kworker.
@@ -143,6 +163,9 @@ enum {
  *
  * WR: wq->mutex protected for writes.  RCU protected for reads.
  *
+ * WO: wq->mutex protected for writes. Updated with WRITE_ONCE() and can be read
+ *     with READ_ONCE() without locking.
+ *
  * MD: wq_mayday_lock protected.
  *
  * WD: Used internally by the watchdog.
@@ -232,6 +255,7 @@ struct pool_workqueue {
        int                     refcnt;         /* L: reference count */
        int                     nr_in_flight[WORK_NR_COLORS];
                                                /* L: nr of in_flight works */
+       bool                    plugged;        /* L: execution suspended */
 
        /*
         * nr_active management and WORK_STRUCT_INACTIVE:
@@ -240,18 +264,18 @@ struct pool_workqueue {
         * pwq->inactive_works instead of pool->worklist and marked with
         * WORK_STRUCT_INACTIVE.
         *
-        * All work items marked with WORK_STRUCT_INACTIVE do not participate
-        * in pwq->nr_active and all work items in pwq->inactive_works are
-        * marked with WORK_STRUCT_INACTIVE.  But not all WORK_STRUCT_INACTIVE
-        * work items are in pwq->inactive_works.  Some of them are ready to
-        * run in pool->worklist or worker->scheduled.  Those work itmes are
-        * only struct wq_barrier which is used for flush_work() and should
-        * not participate in pwq->nr_active.  For non-barrier work item, it
-        * is marked with WORK_STRUCT_INACTIVE iff it is in pwq->inactive_works.
+        * All work items marked with WORK_STRUCT_INACTIVE do not participate in
+        * nr_active and all work items in pwq->inactive_works are marked with
+        * WORK_STRUCT_INACTIVE. But not all WORK_STRUCT_INACTIVE work items are
+        * in pwq->inactive_works. Some of them are ready to run in
+        * pool->worklist or worker->scheduled. Those work itmes are only struct
+        * wq_barrier which is used for flush_work() and should not participate
+        * in nr_active. For non-barrier work item, it is marked with
+        * WORK_STRUCT_INACTIVE iff it is in pwq->inactive_works.
         */
        int                     nr_active;      /* L: nr of active works */
-       int                     max_active;     /* L: max active works */
        struct list_head        inactive_works; /* L: inactive works */
+       struct list_head        pending_node;   /* LN: node on wq_node_nr_active->pending_pwqs */
        struct list_head        pwqs_node;      /* WR: node on wq->pwqs */
        struct list_head        mayday_node;    /* MD: node on wq->maydays */
 
@@ -278,6 +302,26 @@ struct wq_flusher {
 
 struct wq_device;
 
+/*
+ * Unlike in a per-cpu workqueue where max_active limits its concurrency level
+ * on each CPU, in an unbound workqueue, max_active applies to the whole system.
+ * As sharing a single nr_active across multiple sockets can be very expensive,
+ * the counting and enforcement is per NUMA node.
+ *
+ * The following struct is used to enforce per-node max_active. When a pwq wants
+ * to start executing a work item, it should increment ->nr using
+ * tryinc_node_nr_active(). If acquisition fails due to ->nr already being over
+ * ->max, the pwq is queued on ->pending_pwqs. As in-flight work items finish
+ * and decrement ->nr, node_activate_pending_pwq() activates the pending pwqs in
+ * round-robin order.
+ */
+struct wq_node_nr_active {
+       int                     max;            /* per-node max_active */
+       atomic_t                nr;             /* per-node nr_active */
+       raw_spinlock_t          lock;           /* nests inside pool locks */
+       struct list_head        pending_pwqs;   /* LN: pwqs with inactive works */
+};
+
 /*
  * The externally visible workqueue.  It relays the issued work items to
  * the appropriate worker_pool through its pool_workqueues.
@@ -298,10 +342,15 @@ struct workqueue_struct {
        struct worker           *rescuer;       /* MD: rescue worker */
 
        int                     nr_drainers;    /* WQ: drain in progress */
-       int                     saved_max_active; /* WQ: saved pwq max_active */
+
+       /* See alloc_workqueue() function comment for info on min/max_active */
+       int                     max_active;     /* WO: max active works */
+       int                     min_active;     /* WO: min active works */
+       int                     saved_max_active; /* WQ: saved max_active */
+       int                     saved_min_active; /* WQ: saved min_active */
 
        struct workqueue_attrs  *unbound_attrs; /* PW: only for unbound wqs */
-       struct pool_workqueue   *dfl_pwq;       /* PW: only for unbound wqs */
+       struct pool_workqueue __rcu *dfl_pwq;   /* PW: only for unbound wqs */
 
 #ifdef CONFIG_SYSFS
        struct wq_device        *wq_dev;        /* I: for sysfs interface */
@@ -323,6 +372,7 @@ struct workqueue_struct {
        /* hot fields used during command issue, aligned to cacheline */
        unsigned int            flags ____cacheline_aligned; /* WQ: WQ_* flags */
        struct pool_workqueue __percpu __rcu **cpu_pwq; /* I: per-cpu pwqs */
+       struct wq_node_nr_active *node_nr_active[]; /* I: per-node nr_active */
 };
 
 static struct kmem_cache *pwq_cache;
@@ -350,6 +400,8 @@ static const char *wq_affn_names[WQ_AFFN_NR_TYPES] = {
        [WQ_AFFN_SYSTEM]                = "system",
 };
 
+static bool wq_topo_initialized __read_mostly = false;
+
 /*
  * Per-cpu work items which run for longer than the following threshold are
  * automatically considered CPU intensive and excluded from concurrency
@@ -405,8 +457,13 @@ static bool wq_debug_force_rr_cpu = false;
 #endif
 module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
 
+/* the BH worker pools */
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+                                    bh_worker_pools);
+
 /* the per-cpu worker pools */
-static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+                                    cpu_worker_pools);
 
 static DEFINE_IDR(worker_pool_idr);    /* PR: idr of all pools */
 
@@ -440,6 +497,10 @@ struct workqueue_struct *system_power_efficient_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_power_efficient_wq);
 struct workqueue_struct *system_freezable_power_efficient_wq __ro_after_init;
 EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
+struct workqueue_struct *system_bh_wq;
+EXPORT_SYMBOL_GPL(system_bh_wq);
+struct workqueue_struct *system_bh_highpri_wq;
+EXPORT_SYMBOL_GPL(system_bh_highpri_wq);
 
 static int worker_thread(void *__worker);
 static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
@@ -460,6 +521,11 @@ static void show_one_worker_pool(struct worker_pool *pool);
                         !lockdep_is_held(&wq_pool_mutex),              \
                         "RCU, wq->mutex or wq_pool_mutex should be held")
 
+#define for_each_bh_worker_pool(pool, cpu)                             \
+       for ((pool) = &per_cpu(bh_worker_pools, cpu)[0];                \
+            (pool) < &per_cpu(bh_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
+            (pool)++)
+
 #define for_each_cpu_worker_pool(pool, cpu)                            \
        for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];               \
             (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
@@ -632,6 +698,36 @@ static int worker_pool_assign_id(struct worker_pool *pool)
        return ret;
 }
 
+static struct pool_workqueue __rcu **
+unbound_pwq_slot(struct workqueue_struct *wq, int cpu)
+{
+       if (cpu >= 0)
+               return per_cpu_ptr(wq->cpu_pwq, cpu);
+       else
+               return &wq->dfl_pwq;
+}
+
+/* @cpu < 0 for dfl_pwq */
+static struct pool_workqueue *unbound_pwq(struct workqueue_struct *wq, int cpu)
+{
+       return rcu_dereference_check(*unbound_pwq_slot(wq, cpu),
+                                    lockdep_is_held(&wq_pool_mutex) ||
+                                    lockdep_is_held(&wq->mutex));
+}
+
+/**
+ * unbound_effective_cpumask - effective cpumask of an unbound workqueue
+ * @wq: workqueue of interest
+ *
+ * @wq->unbound_attrs->cpumask contains the cpumask requested by the user which
+ * is masked with wq_unbound_cpumask to determine the effective cpumask. The
+ * default pwq is always mapped to the pool with the current effective cpumask.
+ */
+static struct cpumask *unbound_effective_cpumask(struct workqueue_struct *wq)
+{
+       return unbound_pwq(wq, -1)->pool->attrs->__pod_cpumask;
+}
+
 static unsigned int work_color_to_flags(int color)
 {
        return color << WORK_STRUCT_COLOR_SHIFT;
@@ -1118,6 +1214,14 @@ static bool kick_pool(struct worker_pool *pool)
        if (!need_more_worker(pool) || !worker)
                return false;
 
+       if (pool->flags & POOL_BH) {
+               if (pool->attrs->nice == HIGHPRI_NICE_LEVEL)
+                       raise_softirq_irqoff(HI_SOFTIRQ);
+               else
+                       raise_softirq_irqoff(TASKLET_SOFTIRQ);
+               return true;
+       }
+
        p = worker->task;
 
 #ifdef CONFIG_SMP
@@ -1401,6 +1505,74 @@ work_func_t wq_worker_last_func(struct task_struct *task)
        return worker->last_func;
 }
 
+/**
+ * wq_node_nr_active - Determine wq_node_nr_active to use
+ * @wq: workqueue of interest
+ * @node: NUMA node, can be %NUMA_NO_NODE
+ *
+ * Determine wq_node_nr_active to use for @wq on @node. Returns:
+ *
+ * - %NULL for per-cpu workqueues as they don't need to use shared nr_active.
+ *
+ * - node_nr_active[nr_node_ids] if @node is %NUMA_NO_NODE.
+ *
+ * - Otherwise, node_nr_active[@node].
+ */
+static struct wq_node_nr_active *wq_node_nr_active(struct workqueue_struct *wq,
+                                                  int node)
+{
+       if (!(wq->flags & WQ_UNBOUND))
+               return NULL;
+
+       if (node == NUMA_NO_NODE)
+               node = nr_node_ids;
+
+       return wq->node_nr_active[node];
+}
+
+/**
+ * wq_update_node_max_active - Update per-node max_actives to use
+ * @wq: workqueue to update
+ * @off_cpu: CPU that's going down, -1 if a CPU is not going down
+ *
+ * Update @wq->node_nr_active[]->max. @wq must be unbound. max_active is
+ * distributed among nodes according to the proportions of numbers of online
+ * cpus. The result is always between @wq->min_active and max_active.
+ */
+static void wq_update_node_max_active(struct workqueue_struct *wq, int off_cpu)
+{
+       struct cpumask *effective = unbound_effective_cpumask(wq);
+       int min_active = READ_ONCE(wq->min_active);
+       int max_active = READ_ONCE(wq->max_active);
+       int total_cpus, node;
+
+       lockdep_assert_held(&wq->mutex);
+
+       if (!wq_topo_initialized)
+               return;
+
+       if (off_cpu >= 0 && !cpumask_test_cpu(off_cpu, effective))
+               off_cpu = -1;
+
+       total_cpus = cpumask_weight_and(effective, cpu_online_mask);
+       if (off_cpu >= 0)
+               total_cpus--;
+
+       for_each_node(node) {
+               int node_cpus;
+
+               node_cpus = cpumask_weight_and(effective, cpumask_of_node(node));
+               if (off_cpu >= 0 && cpu_to_node(off_cpu) == node)
+                       node_cpus--;
+
+               wq_node_nr_active(wq, node)->max =
+                       clamp(DIV_ROUND_UP(max_active * node_cpus, total_cpus),
+                             min_active, max_active);
+       }
+
+       wq_node_nr_active(wq, NUMA_NO_NODE)->max = min_active;
+}
+
 /**
  * get_pwq - get an extra reference on the specified pool_workqueue
  * @pwq: pool_workqueue to get
@@ -1453,24 +1625,333 @@ static void put_pwq_unlocked(struct pool_workqueue *pwq)
        }
 }
 
-static void pwq_activate_inactive_work(struct work_struct *work)
+static bool pwq_is_empty(struct pool_workqueue *pwq)
 {
-       struct pool_workqueue *pwq = get_work_pwq(work);
+       return !pwq->nr_active && list_empty(&pwq->inactive_works);
+}
 
+static void __pwq_activate_work(struct pool_workqueue *pwq,
+                               struct work_struct *work)
+{
+       unsigned long *wdb = work_data_bits(work);
+
+       WARN_ON_ONCE(!(*wdb & WORK_STRUCT_INACTIVE));
        trace_workqueue_activate_work(work);
        if (list_empty(&pwq->pool->worklist))
                pwq->pool->watchdog_ts = jiffies;
        move_linked_works(work, &pwq->pool->worklist, NULL);
-       __clear_bit(WORK_STRUCT_INACTIVE_BIT, work_data_bits(work));
+       __clear_bit(WORK_STRUCT_INACTIVE_BIT, wdb);
+}
+
+/**
+ * pwq_activate_work - Activate a work item if inactive
+ * @pwq: pool_workqueue @work belongs to
+ * @work: work item to activate
+ *
+ * Returns %true if activated. %false if already active.
+ */
+static bool pwq_activate_work(struct pool_workqueue *pwq,
+                             struct work_struct *work)
+{
+       struct worker_pool *pool = pwq->pool;
+       struct wq_node_nr_active *nna;
+
+       lockdep_assert_held(&pool->lock);
+
+       if (!(*work_data_bits(work) & WORK_STRUCT_INACTIVE))
+               return false;
+
+       nna = wq_node_nr_active(pwq->wq, pool->node);
+       if (nna)
+               atomic_inc(&nna->nr);
+
        pwq->nr_active++;
+       __pwq_activate_work(pwq, work);
+       return true;
+}
+
+static bool tryinc_node_nr_active(struct wq_node_nr_active *nna)
+{
+       int max = READ_ONCE(nna->max);
+
+       while (true) {
+               int old, tmp;
+
+               old = atomic_read(&nna->nr);
+               if (old >= max)
+                       return false;
+               tmp = atomic_cmpxchg_relaxed(&nna->nr, old, old + 1);
+               if (tmp == old)
+                       return true;
+       }
+}
+
+/**
+ * pwq_tryinc_nr_active - Try to increment nr_active for a pwq
+ * @pwq: pool_workqueue of interest
+ * @fill: max_active may have increased, try to increase concurrency level
+ *
+ * Try to increment nr_active for @pwq. Returns %true if an nr_active count is
+ * successfully obtained. %false otherwise.
+ */
+static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq, bool fill)
+{
+       struct workqueue_struct *wq = pwq->wq;
+       struct worker_pool *pool = pwq->pool;
+       struct wq_node_nr_active *nna = wq_node_nr_active(wq, pool->node);
+       bool obtained = false;
+
+       lockdep_assert_held(&pool->lock);
+
+       if (!nna) {
+               /* BH or per-cpu workqueue, pwq->nr_active is sufficient */
+               obtained = pwq->nr_active < READ_ONCE(wq->max_active);
+               goto out;
+       }
+
+       if (unlikely(pwq->plugged))
+               return false;
+
+       /*
+        * Unbound workqueue uses per-node shared nr_active $nna. If @pwq is
+        * already waiting on $nna, pwq_dec_nr_active() will maintain the
+        * concurrency level. Don't jump the line.
+        *
+        * We need to ignore the pending test after max_active has increased as
+        * pwq_dec_nr_active() can only maintain the concurrency level but not
+        * increase it. This is indicated by @fill.
+        */
+       if (!list_empty(&pwq->pending_node) && likely(!fill))
+               goto out;
+
+       obtained = tryinc_node_nr_active(nna);
+       if (obtained)
+               goto out;
+
+       /*
+        * Lockless acquisition failed. Lock, add ourself to $nna->pending_pwqs
+        * and try again. The smp_mb() is paired with the implied memory barrier
+        * of atomic_dec_return() in pwq_dec_nr_active() to ensure that either
+        * we see the decremented $nna->nr or they see non-empty
+        * $nna->pending_pwqs.
+        */
+       raw_spin_lock(&nna->lock);
+
+       if (list_empty(&pwq->pending_node))
+               list_add_tail(&pwq->pending_node, &nna->pending_pwqs);
+       else if (likely(!fill))
+               goto out_unlock;
+
+       smp_mb();
+
+       obtained = tryinc_node_nr_active(nna);
+
+       /*
+        * If @fill, @pwq might have already been pending. Being spuriously
+        * pending in cold paths doesn't affect anything. Let's leave it be.
+        */
+       if (obtained && likely(!fill))
+               list_del_init(&pwq->pending_node);
+
+out_unlock:
+       raw_spin_unlock(&nna->lock);
+out:
+       if (obtained)
+               pwq->nr_active++;
+       return obtained;
+}
+
+/**
+ * pwq_activate_first_inactive - Activate the first inactive work item on a pwq
+ * @pwq: pool_workqueue of interest
+ * @fill: max_active may have increased, try to increase concurrency level
+ *
+ * Activate the first inactive work item of @pwq if available and allowed by
+ * max_active limit.
+ *
+ * Returns %true if an inactive work item has been activated. %false if no
+ * inactive work item is found or max_active limit is reached.
+ */
+static bool pwq_activate_first_inactive(struct pool_workqueue *pwq, bool fill)
+{
+       struct work_struct *work =
+               list_first_entry_or_null(&pwq->inactive_works,
+                                        struct work_struct, entry);
+
+       if (work && pwq_tryinc_nr_active(pwq, fill)) {
+               __pwq_activate_work(pwq, work);
+               return true;
+       } else {
+               return false;
+       }
+}
+
+/**
+ * unplug_oldest_pwq - restart an oldest plugged pool_workqueue
+ * @wq: workqueue_struct to be restarted
+ *
+ * pwq's are linked into wq->pwqs with the oldest first. For ordered
+ * workqueues, only the oldest pwq is unplugged, the others are plugged to
+ * suspend execution until the oldest one is drained. When this happens, the
+ * next oldest one (first plugged pwq in iteration) will be unplugged to
+ * restart work item execution to ensure proper work item ordering.
+ *
+ *    dfl_pwq --------------+     [P] - plugged
+ *                          |
+ *                          v
+ *    pwqs -> A -> B [P] -> C [P] (newest)
+ *            |    |        |
+ *            1    3        5
+ *            |    |        |
+ *            2    4        6
+ */
+static void unplug_oldest_pwq(struct workqueue_struct *wq)
+{
+       struct pool_workqueue *pwq;
+
+       lockdep_assert_held(&wq->mutex);
+
+       /* Caller should make sure that pwqs isn't empty before calling */
+       pwq = list_first_entry_or_null(&wq->pwqs, struct pool_workqueue,
+                                      pwqs_node);
+       raw_spin_lock_irq(&pwq->pool->lock);
+       if (pwq->plugged) {
+               pwq->plugged = false;
+               if (pwq_activate_first_inactive(pwq, true))
+                       kick_pool(pwq->pool);
+       }
+       raw_spin_unlock_irq(&pwq->pool->lock);
+}
+
+/**
+ * node_activate_pending_pwq - Activate a pending pwq on a wq_node_nr_active
+ * @nna: wq_node_nr_active to activate a pending pwq for
+ * @caller_pool: worker_pool the caller is locking
+ *
+ * Activate a pwq in @nna->pending_pwqs. Called with @caller_pool locked.
+ * @caller_pool may be unlocked and relocked to lock other worker_pools.
+ */
+static void node_activate_pending_pwq(struct wq_node_nr_active *nna,
+                                     struct worker_pool *caller_pool)
+{
+       struct worker_pool *locked_pool = caller_pool;
+       struct pool_workqueue *pwq;
+       struct work_struct *work;
+
+       lockdep_assert_held(&caller_pool->lock);
+
+       raw_spin_lock(&nna->lock);
+retry:
+       pwq = list_first_entry_or_null(&nna->pending_pwqs,
+                                      struct pool_workqueue, pending_node);
+       if (!pwq)
+               goto out_unlock;
+
+       /*
+        * If @pwq is for a different pool than @locked_pool, we need to lock
+        * @pwq->pool->lock. Let's trylock first. If unsuccessful, do the unlock
+        * / lock dance. For that, we also need to release @nna->lock as it's
+        * nested inside pool locks.
+        */
+       if (pwq->pool != locked_pool) {
+               raw_spin_unlock(&locked_pool->lock);
+               locked_pool = pwq->pool;
+               if (!raw_spin_trylock(&locked_pool->lock)) {
+                       raw_spin_unlock(&nna->lock);
+                       raw_spin_lock(&locked_pool->lock);
+                       raw_spin_lock(&nna->lock);
+                       goto retry;
+               }
+       }
+
+       /*
+        * $pwq may not have any inactive work items due to e.g. cancellations.
+        * Drop it from pending_pwqs and see if there's another one.
+        */
+       work = list_first_entry_or_null(&pwq->inactive_works,
+                                       struct work_struct, entry);
+       if (!work) {
+               list_del_init(&pwq->pending_node);
+               goto retry;
+       }
+
+       /*
+        * Acquire an nr_active count and activate the inactive work item. If
+        * $pwq still has inactive work items, rotate it to the end of the
+        * pending_pwqs so that we round-robin through them. This means that
+        * inactive work items are not activated in queueing order which is fine
+        * given that there has never been any ordering across different pwqs.
+        */
+       if (likely(tryinc_node_nr_active(nna))) {
+               pwq->nr_active++;
+               __pwq_activate_work(pwq, work);
+
+               if (list_empty(&pwq->inactive_works))
+                       list_del_init(&pwq->pending_node);
+               else
+                       list_move_tail(&pwq->pending_node, &nna->pending_pwqs);
+
+               /* if activating a foreign pool, make sure it's running */
+               if (pwq->pool != caller_pool)
+                       kick_pool(pwq->pool);
+       }
+
+out_unlock:
+       raw_spin_unlock(&nna->lock);
+       if (locked_pool != caller_pool) {
+               raw_spin_unlock(&locked_pool->lock);
+               raw_spin_lock(&caller_pool->lock);
+       }
 }
 
-static void pwq_activate_first_inactive(struct pool_workqueue *pwq)
+/**
+ * pwq_dec_nr_active - Retire an active count
+ * @pwq: pool_workqueue of interest
+ *
+ * Decrement @pwq's nr_active and try to activate the first inactive work item.
+ * For unbound workqueues, this function may temporarily drop @pwq->pool->lock.
+ */
+static void pwq_dec_nr_active(struct pool_workqueue *pwq)
 {
-       struct work_struct *work = list_first_entry(&pwq->inactive_works,
-                                                   struct work_struct, entry);
+       struct worker_pool *pool = pwq->pool;
+       struct wq_node_nr_active *nna = wq_node_nr_active(pwq->wq, pool->node);
 
-       pwq_activate_inactive_work(work);
+       lockdep_assert_held(&pool->lock);
+
+       /*
+        * @pwq->nr_active should be decremented for both percpu and unbound
+        * workqueues.
+        */
+       pwq->nr_active--;
+
+       /*
+        * For a percpu workqueue, it's simple. Just need to kick the first
+        * inactive work item on @pwq itself.
+        */
+       if (!nna) {
+               pwq_activate_first_inactive(pwq, false);
+               return;
+       }
+
+       /*
+        * If @pwq is for an unbound workqueue, it's more complicated because
+        * multiple pwqs and pools may be sharing the nr_active count. When a
+        * pwq needs to wait for an nr_active count, it puts itself on
+        * $nna->pending_pwqs. The following atomic_dec_return()'s implied
+        * memory barrier is paired with smp_mb() in pwq_tryinc_nr_active() to
+        * guarantee that either we see non-empty pending_pwqs or they see
+        * decremented $nna->nr.
+        *
+        * $nna->max may change as CPUs come online/offline and @pwq->wq's
+        * max_active gets updated. However, it is guaranteed to be equal to or
+        * larger than @pwq->wq->min_active which is above zero unless freezing.
+        * This maintains the forward progress guarantee.
+        */
+       if (atomic_dec_return(&nna->nr) >= READ_ONCE(nna->max))
+               return;
+
+       if (!list_empty(&nna->pending_pwqs))
+               node_activate_pending_pwq(nna, pool);
 }
 
 /**
@@ -1481,6 +1962,11 @@ static void pwq_activate_first_inactive(struct pool_workqueue *pwq)
  * A work either has completed or is removed from pending queue,
  * decrement nr_in_flight of its pwq and handle workqueue flushing.
  *
+ * NOTE:
+ * For unbound workqueues, this function may temporarily drop @pwq->pool->lock
+ * and thus should be called after all other state updates for the in-flight
+ * work item is complete.
+ *
  * CONTEXT:
  * raw_spin_lock_irq(pool->lock).
  */
@@ -1488,14 +1974,8 @@ static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_
 {
        int color = get_work_color(work_data);
 
-       if (!(work_data & WORK_STRUCT_INACTIVE)) {
-               pwq->nr_active--;
-               if (!list_empty(&pwq->inactive_works)) {
-                       /* one down, submit an inactive one */
-                       if (pwq->nr_active < pwq->max_active)
-                               pwq_activate_first_inactive(pwq);
-               }
-       }
+       if (!(work_data & WORK_STRUCT_INACTIVE))
+               pwq_dec_nr_active(pwq);
 
        pwq->nr_in_flight[color]--;
 
@@ -1595,6 +2075,8 @@ static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
         */
        pwq = get_work_pwq(work);
        if (pwq && pwq->pool == pool) {
+               unsigned long work_data;
+
                debug_work_deactivate(work);
 
                /*
@@ -1608,15 +2090,20 @@ static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
                 * management later on and cause stall.  Make sure the work
                 * item is activated before grabbing.
                 */
-               if (*work_data_bits(work) & WORK_STRUCT_INACTIVE)
-                       pwq_activate_inactive_work(work);
+               pwq_activate_work(pwq, work);
 
                list_del_init(&work->entry);
-               pwq_dec_nr_in_flight(pwq, *work_data_bits(work));
 
-               /* work->data points to pwq iff queued, point to pool */
+               /*
+                * work->data points to pwq iff queued. Let's point to pool. As
+                * this destroys work->data needed by the next step, stash it.
+                */
+               work_data = *work_data_bits(work);
                set_work_pool_and_keep_pending(work, pool->id);
 
+               /* must be the last step, see the function comment */
+               pwq_dec_nr_in_flight(pwq, work_data);
+
                raw_spin_unlock(&pool->lock);
                rcu_read_unlock();
                return 1;
@@ -1793,12 +2280,16 @@ retry:
        pwq->nr_in_flight[pwq->work_color]++;
        work_flags = work_color_to_flags(pwq->work_color);
 
-       if (likely(pwq->nr_active < pwq->max_active)) {
+       /*
+        * Limit the number of concurrently active work items to max_active.
+        * @work must also queue behind existing inactive work items to maintain
+        * ordering when max_active changes. See wq_adjust_max_active().
+        */
+       if (list_empty(&pwq->inactive_works) && pwq_tryinc_nr_active(pwq, false)) {
                if (list_empty(&pool->worklist))
                        pool->watchdog_ts = jiffies;
 
                trace_workqueue_activate_work(work);
-               pwq->nr_active++;
                insert_work(pwq, work, &pool->worklist, work_flags);
                kick_pool(pool);
        } else {
@@ -1958,10 +2449,18 @@ static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
        dwork->cpu = cpu;
        timer->expires = jiffies + delay;
 
-       if (unlikely(cpu != WORK_CPU_UNBOUND))
+       if (housekeeping_enabled(HK_TYPE_TIMER)) {
+               /* If the current cpu is a housekeeping cpu, use it. */
+               cpu = smp_processor_id();
+               if (!housekeeping_test_cpu(cpu, HK_TYPE_TIMER))
+                       cpu = housekeeping_any_cpu(HK_TYPE_TIMER);
                add_timer_on(timer, cpu);
-       else
-               add_timer(timer);
+       } else {
+               if (likely(cpu == WORK_CPU_UNBOUND))
+                       add_timer(timer);
+               else
+                       add_timer_on(timer, cpu);
+       }
 }
 
 /**
@@ -2100,19 +2599,21 @@ static cpumask_t *pool_allowed_cpus(struct worker_pool *pool)
  * cpu-[un]hotplugs.
  */
 static void worker_attach_to_pool(struct worker *worker,
-                                  struct worker_pool *pool)
+                                 struct worker_pool *pool)
 {
        mutex_lock(&wq_pool_attach_mutex);
 
        /*
-        * The wq_pool_attach_mutex ensures %POOL_DISASSOCIATED remains
-        * stable across this function.  See the comments above the flag
-        * definition for details.
+        * The wq_pool_attach_mutex ensures %POOL_DISASSOCIATED remains stable
+        * across this function. See the comments above the flag definition for
+        * details. BH workers are, while per-CPU, always DISASSOCIATED.
         */
-       if (pool->flags & POOL_DISASSOCIATED)
+       if (pool->flags & POOL_DISASSOCIATED) {
                worker->flags |= WORKER_UNBOUND;
-       else
+       } else {
+               WARN_ON_ONCE(pool->flags & POOL_BH);
                kthread_set_per_cpu(worker->task, pool->cpu);
+       }
 
        if (worker->rescue_wq)
                set_cpus_allowed_ptr(worker->task, pool_allowed_cpus(pool));
@@ -2136,6 +2637,9 @@ static void worker_detach_from_pool(struct worker *worker)
        struct worker_pool *pool = worker->pool;
        struct completion *detach_completion = NULL;
 
+       /* there is one permanent BH worker per CPU which should never detach */
+       WARN_ON_ONCE(pool->flags & POOL_BH);
+
        mutex_lock(&wq_pool_attach_mutex);
 
        kthread_set_per_cpu(worker->task, -1);
@@ -2187,27 +2691,29 @@ static struct worker *create_worker(struct worker_pool *pool)
 
        worker->id = id;
 
-       if (pool->cpu >= 0)
-               snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
-                        pool->attrs->nice < 0  ? "H" : "");
-       else
-               snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
-
-       worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
-                                             "kworker/%s", id_buf);
-       if (IS_ERR(worker->task)) {
-               if (PTR_ERR(worker->task) == -EINTR) {
-                       pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
-                              id_buf);
-               } else {
-                       pr_err_once("workqueue: Failed to create a worker thread: %pe",
-                                   worker->task);
+       if (!(pool->flags & POOL_BH)) {
+               if (pool->cpu >= 0)
+                       snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
+                                pool->attrs->nice < 0  ? "H" : "");
+               else
+                       snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
+
+               worker->task = kthread_create_on_node(worker_thread, worker,
+                                       pool->node, "kworker/%s", id_buf);
+               if (IS_ERR(worker->task)) {
+                       if (PTR_ERR(worker->task) == -EINTR) {
+                               pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
+                                      id_buf);
+                       } else {
+                               pr_err_once("workqueue: Failed to create a worker thread: %pe",
+                                           worker->task);
+                       }
+                       goto fail;
                }
-               goto fail;
-       }
 
-       set_user_nice(worker->task, pool->attrs->nice);
-       kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+               set_user_nice(worker->task, pool->attrs->nice);
+               kthread_bind_mask(worker->task, pool_allowed_cpus(pool));
+       }
 
        /* successful, attach the worker to the pool */
        worker_attach_to_pool(worker, pool);
@@ -2217,14 +2723,14 @@ static struct worker *create_worker(struct worker_pool *pool)
 
        worker->pool->nr_workers++;
        worker_enter_idle(worker);
-       kick_pool(pool);
 
        /*
         * @worker is waiting on a completion in kthread() and will trigger hung
-        * check if not woken up soon. As kick_pool() might not have waken it
-        * up, wake it up explicitly once more.
+        * check if not woken up soon. As kick_pool() is noop if @pool is empty,
+        * wake it up explicitly.
         */
-       wake_up_process(worker->task);
+       if (worker->task)
+               wake_up_process(worker->task);
 
        raw_spin_unlock_irq(&pool->lock);
 
@@ -2543,6 +3049,7 @@ __acquires(&pool->lock)
        struct pool_workqueue *pwq = get_work_pwq(work);
        struct worker_pool *pool = worker->pool;
        unsigned long work_data;
+       int lockdep_start_depth, rcu_start_depth;
 #ifdef CONFIG_LOCKDEP
        /*
         * It is permissible to free the struct work_struct from
@@ -2565,7 +3072,8 @@ __acquires(&pool->lock)
        worker->current_work = work;
        worker->current_func = work->func;
        worker->current_pwq = pwq;
-       worker->current_at = worker->task->se.sum_exec_runtime;
+       if (worker->task)
+               worker->current_at = worker->task->se.sum_exec_runtime;
        work_data = *work_data_bits(work);
        worker->current_color = get_work_color(work_data);
 
@@ -2605,6 +3113,8 @@ __acquires(&pool->lock)
        pwq->stats[PWQ_STAT_STARTED]++;
        raw_spin_unlock_irq(&pool->lock);
 
+       rcu_start_depth = rcu_preempt_depth();
+       lockdep_start_depth = lockdep_depth(current);
        lock_map_acquire(&pwq->wq->lockdep_map);
        lock_map_acquire(&lockdep_map);
        /*
@@ -2640,12 +3150,15 @@ __acquires(&pool->lock)
        lock_map_release(&lockdep_map);
        lock_map_release(&pwq->wq->lockdep_map);
 
-       if (unlikely(in_atomic() || lockdep_depth(current) > 0 ||
-                    rcu_preempt_depth() > 0)) {
-               pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d/%d\n"
-                      "     last function: %ps\n",
-                      current->comm, preempt_count(), rcu_preempt_depth(),
-                      task_pid_nr(current), worker->current_func);
+       if (unlikely((worker->task && in_atomic()) ||
+                    lockdep_depth(current) != lockdep_start_depth ||
+                    rcu_preempt_depth() != rcu_start_depth)) {
+               pr_err("BUG: workqueue leaked atomic, lock or RCU: %s[%d]\n"
+                      "     preempt=0x%08x lock=%d->%d RCU=%d->%d workfn=%ps\n",
+                      current->comm, task_pid_nr(current), preempt_count(),
+                      lockdep_start_depth, lockdep_depth(current),
+                      rcu_start_depth, rcu_preempt_depth(),
+                      worker->current_func);
                debug_show_held_locks(current);
                dump_stack();
        }
@@ -2658,7 +3171,8 @@ __acquires(&pool->lock)
         * stop_machine. At the same time, report a quiescent RCU state so
         * the same condition doesn't freeze RCU.
         */
-       cond_resched();
+       if (worker->task)
+               cond_resched();
 
        raw_spin_lock_irq(&pool->lock);
 
@@ -2678,6 +3192,8 @@ __acquires(&pool->lock)
        worker->current_func = NULL;
        worker->current_pwq = NULL;
        worker->current_color = INT_MAX;
+
+       /* must be the last step, see the function comment */
        pwq_dec_nr_in_flight(pwq, work_data);
 }
 
@@ -2939,6 +3455,61 @@ repeat:
        goto repeat;
 }
 
+static void bh_worker(struct worker *worker)
+{
+       struct worker_pool *pool = worker->pool;
+       int nr_restarts = BH_WORKER_RESTARTS;
+       unsigned long end = jiffies + BH_WORKER_JIFFIES;
+
+       raw_spin_lock_irq(&pool->lock);
+       worker_leave_idle(worker);
+
+       /*
+        * This function follows the structure of worker_thread(). See there for
+        * explanations on each step.
+        */
+       if (!need_more_worker(pool))
+               goto done;
+
+       WARN_ON_ONCE(!list_empty(&worker->scheduled));
+       worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
+
+       do {
+               struct work_struct *work =
+                       list_first_entry(&pool->worklist,
+                                        struct work_struct, entry);
+
+               if (assign_work(work, worker, NULL))
+                       process_scheduled_works(worker);
+       } while (keep_working(pool) &&
+                --nr_restarts && time_before(jiffies, end));
+
+       worker_set_flags(worker, WORKER_PREP);
+done:
+       worker_enter_idle(worker);
+       kick_pool(pool);
+       raw_spin_unlock_irq(&pool->lock);
+}
+
+/*
+ * TODO: Convert all tasklet users to workqueue and use softirq directly.
+ *
+ * This is currently called from tasklet[_hi]action() and thus is also called
+ * whenever there are tasklets to run. Let's do an early exit if there's nothing
+ * queued. Once conversion from tasklet is complete, the need_more_worker() test
+ * can be dropped.
+ *
+ * After full conversion, we'll add worker->softirq_action, directly use the
+ * softirq action and obtain the worker pointer from the softirq_action pointer.
+ */
+void workqueue_softirq_action(bool highpri)
+{
+       struct worker_pool *pool =
+               &per_cpu(bh_worker_pools, smp_processor_id())[highpri];
+       if (need_more_worker(pool))
+               bh_worker(list_first_entry(&pool->workers, struct worker, node));
+}
+
 /**
  * check_flush_dependency - check for flush dependency sanity
  * @target_wq: workqueue being flushed
@@ -3011,6 +3582,7 @@ static void insert_wq_barrier(struct pool_workqueue *pwq,
                              struct wq_barrier *barr,
                              struct work_struct *target, struct worker *worker)
 {
+       static __maybe_unused struct lock_class_key bh_key, thr_key;
        unsigned int work_flags = 0;
        unsigned int work_color;
        struct list_head *head;
@@ -3020,15 +3592,20 @@ static void insert_wq_barrier(struct pool_workqueue *pwq,
         * as we know for sure that this will not trigger any of the
         * checks and call back into the fixup functions where we
         * might deadlock.
+        *
+        * BH and threaded workqueues need separate lockdep keys to avoid
+        * spuriously triggering "inconsistent {SOFTIRQ-ON-W} -> {IN-SOFTIRQ-W}
+        * usage".
         */
-       INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
+       INIT_WORK_ONSTACK_KEY(&barr->work, wq_barrier_func,
+                             (pwq->wq->flags & WQ_BH) ? &bh_key : &thr_key);
        __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
 
        init_completion_map(&barr->done, &target->lockdep_map);
 
        barr->task = current;
 
-       /* The barrier work item does not participate in pwq->nr_active. */
+       /* The barrier work item does not participate in nr_active. */
        work_flags |= WORK_STRUCT_INACTIVE;
 
        /*
@@ -3125,6 +3702,35 @@ static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
        return wait;
 }
 
+static void touch_wq_lockdep_map(struct workqueue_struct *wq)
+{
+#ifdef CONFIG_LOCKDEP
+       if (wq->flags & WQ_BH)
+               local_bh_disable();
+
+       lock_map_acquire(&wq->lockdep_map);
+       lock_map_release(&wq->lockdep_map);
+
+       if (wq->flags & WQ_BH)
+               local_bh_enable();
+#endif
+}
+
+static void touch_work_lockdep_map(struct work_struct *work,
+                                  struct workqueue_struct *wq)
+{
+#ifdef CONFIG_LOCKDEP
+       if (wq->flags & WQ_BH)
+               local_bh_disable();
+
+       lock_map_acquire(&work->lockdep_map);
+       lock_map_release(&work->lockdep_map);
+
+       if (wq->flags & WQ_BH)
+               local_bh_enable();
+#endif
+}
+
 /**
  * __flush_workqueue - ensure that any scheduled work has run to completion.
  * @wq: workqueue to flush
@@ -3144,8 +3750,7 @@ void __flush_workqueue(struct workqueue_struct *wq)
        if (WARN_ON(!wq_online))
                return;
 
-       lock_map_acquire(&wq->lockdep_map);
-       lock_map_release(&wq->lockdep_map);
+       touch_wq_lockdep_map(wq);
 
        mutex_lock(&wq->mutex);
 
@@ -3317,7 +3922,7 @@ reflush:
                bool drained;
 
                raw_spin_lock_irq(&pwq->pool->lock);
-               drained = !pwq->nr_active && list_empty(&pwq->inactive_works);
+               drained = pwq_is_empty(pwq);
                raw_spin_unlock_irq(&pwq->pool->lock);
 
                if (drained)
@@ -3344,6 +3949,7 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
        struct worker *worker = NULL;
        struct worker_pool *pool;
        struct pool_workqueue *pwq;
+       struct workqueue_struct *wq;
 
        might_sleep();
 
@@ -3367,11 +3973,14 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
                pwq = worker->current_pwq;
        }
 
-       check_flush_dependency(pwq->wq, work);
+       wq = pwq->wq;
+       check_flush_dependency(wq, work);
 
        insert_wq_barrier(pwq, barr, work, worker);
        raw_spin_unlock_irq(&pool->lock);
 
+       touch_work_lockdep_map(work, wq);
+
        /*
         * Force a lock recursion deadlock when using flush_work() inside a
         * single-threaded or rescuer equipped workqueue.
@@ -3381,11 +3990,9 @@ static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
         * workqueues the deadlock happens when the rescuer stalls, blocking
         * forward progress.
         */
-       if (!from_cancel &&
-           (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)) {
-               lock_map_acquire(&pwq->wq->lockdep_map);
-               lock_map_release(&pwq->wq->lockdep_map);
-       }
+       if (!from_cancel && (wq->saved_max_active == 1 || wq->rescuer))
+               touch_wq_lockdep_map(wq);
+
        rcu_read_unlock();
        return true;
 already_gone:
@@ -3404,9 +4011,6 @@ static bool __flush_work(struct work_struct *work, bool from_cancel)
        if (WARN_ON(!work->func))
                return false;
 
-       lock_map_acquire(&work->lockdep_map);
-       lock_map_release(&work->lockdep_map);
-
        if (start_flush_work(work, &barr, from_cancel)) {
                wait_for_completion(&barr.done);
                destroy_work_on_stack(&barr.work);
@@ -3928,11 +4532,66 @@ static void wq_free_lockdep(struct workqueue_struct *wq)
 }
 #endif
 
+static void free_node_nr_active(struct wq_node_nr_active **nna_ar)
+{
+       int node;
+
+       for_each_node(node) {
+               kfree(nna_ar[node]);
+               nna_ar[node] = NULL;
+       }
+
+       kfree(nna_ar[nr_node_ids]);
+       nna_ar[nr_node_ids] = NULL;
+}
+
+static void init_node_nr_active(struct wq_node_nr_active *nna)
+{
+       nna->max = WQ_DFL_MIN_ACTIVE;
+       atomic_set(&nna->nr, 0);
+       raw_spin_lock_init(&nna->lock);
+       INIT_LIST_HEAD(&nna->pending_pwqs);
+}
+
+/*
+ * Each node's nr_active counter will be accessed mostly from its own node and
+ * should be allocated in the node.
+ */
+static int alloc_node_nr_active(struct wq_node_nr_active **nna_ar)
+{
+       struct wq_node_nr_active *nna;
+       int node;
+
+       for_each_node(node) {
+               nna = kzalloc_node(sizeof(*nna), GFP_KERNEL, node);
+               if (!nna)
+                       goto err_free;
+               init_node_nr_active(nna);
+               nna_ar[node] = nna;
+       }
+
+       /* [nr_node_ids] is used as the fallback */
+       nna = kzalloc_node(sizeof(*nna), GFP_KERNEL, NUMA_NO_NODE);
+       if (!nna)
+               goto err_free;
+       init_node_nr_active(nna);
+       nna_ar[nr_node_ids] = nna;
+
+       return 0;
+
+err_free:
+       free_node_nr_active(nna_ar);
+       return -ENOMEM;
+}
+
 static void rcu_free_wq(struct rcu_head *rcu)
 {
        struct workqueue_struct *wq =
                container_of(rcu, struct workqueue_struct, rcu);
 
+       if (wq->flags & WQ_UNBOUND)
+               free_node_nr_active(wq->node_nr_active);
+
        wq_free_lockdep(wq);
        free_percpu(wq->cpu_pwq);
        free_workqueue_attrs(wq->unbound_attrs);
@@ -4122,6 +4781,13 @@ static void pwq_release_workfn(struct kthread_work *work)
                mutex_lock(&wq->mutex);
                list_del_rcu(&pwq->pwqs_node);
                is_last = list_empty(&wq->pwqs);
+
+               /*
+                * For ordered workqueue with a plugged dfl_pwq, restart it now.
+                */
+               if (!is_last && (wq->flags & __WQ_ORDERED))
+                       unplug_oldest_pwq(wq);
+
                mutex_unlock(&wq->mutex);
        }
 
@@ -4131,6 +4797,15 @@ static void pwq_release_workfn(struct kthread_work *work)
                mutex_unlock(&wq_pool_mutex);
        }
 
+       if (!list_empty(&pwq->pending_node)) {
+               struct wq_node_nr_active *nna =
+                       wq_node_nr_active(pwq->wq, pwq->pool->node);
+
+               raw_spin_lock_irq(&nna->lock);
+               list_del_init(&pwq->pending_node);
+               raw_spin_unlock_irq(&nna->lock);
+       }
+
        call_rcu(&pwq->rcu, rcu_free_pwq);
 
        /*
@@ -4143,50 +4818,6 @@ static void pwq_release_workfn(struct kthread_work *work)
        }
 }
 
-/**
- * pwq_adjust_max_active - update a pwq's max_active to the current setting
- * @pwq: target pool_workqueue
- *
- * If @pwq isn't freezing, set @pwq->max_active to the associated
- * workqueue's saved_max_active and activate inactive work items
- * accordingly.  If @pwq is freezing, clear @pwq->max_active to zero.
- */
-static void pwq_adjust_max_active(struct pool_workqueue *pwq)
-{
-       struct workqueue_struct *wq = pwq->wq;
-       bool freezable = wq->flags & WQ_FREEZABLE;
-       unsigned long flags;
-
-       /* for @wq->saved_max_active */
-       lockdep_assert_held(&wq->mutex);
-
-       /* fast exit for non-freezable wqs */
-       if (!freezable && pwq->max_active == wq->saved_max_active)
-               return;
-
-       /* this function can be called during early boot w/ irq disabled */
-       raw_spin_lock_irqsave(&pwq->pool->lock, flags);
-
-       /*
-        * During [un]freezing, the caller is responsible for ensuring that
-        * this function is called at least once after @workqueue_freezing
-        * is updated and visible.
-        */
-       if (!freezable || !workqueue_freezing) {
-               pwq->max_active = wq->saved_max_active;
-
-               while (!list_empty(&pwq->inactive_works) &&
-                      pwq->nr_active < pwq->max_active)
-                       pwq_activate_first_inactive(pwq);
-
-               kick_pool(pwq->pool);
-       } else {
-               pwq->max_active = 0;
-       }
-
-       raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
-}
-
 /* initialize newly allocated @pwq which is associated with @wq and @pool */
 static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
                     struct worker_pool *pool)
@@ -4200,6 +4831,7 @@ static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
        pwq->flush_color = -1;
        pwq->refcnt = 1;
        INIT_LIST_HEAD(&pwq->inactive_works);
+       INIT_LIST_HEAD(&pwq->pending_node);
        INIT_LIST_HEAD(&pwq->pwqs_node);
        INIT_LIST_HEAD(&pwq->mayday_node);
        kthread_init_work(&pwq->release_work, pwq_release_workfn);
@@ -4219,11 +4851,8 @@ static void link_pwq(struct pool_workqueue *pwq)
        /* set the matching work_color */
        pwq->work_color = wq->work_color;
 
-       /* sync max_active to the current setting */
-       pwq_adjust_max_active(pwq);
-
        /* link in @pwq */
-       list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
+       list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs);
 }
 
 /* obtain a pool matching @attr and create a pwq associating the pool and @wq */
@@ -4290,10 +4919,11 @@ static void wq_calc_pod_cpumask(struct workqueue_attrs *attrs, int cpu,
                                "possible intersect\n");
 }
 
-/* install @pwq into @wq's cpu_pwq and return the old pwq */
+/* install @pwq into @wq and return the old pwq, @cpu < 0 for dfl_pwq */
 static struct pool_workqueue *install_unbound_pwq(struct workqueue_struct *wq,
                                        int cpu, struct pool_workqueue *pwq)
 {
+       struct pool_workqueue __rcu **slot = unbound_pwq_slot(wq, cpu);
        struct pool_workqueue *old_pwq;
 
        lockdep_assert_held(&wq_pool_mutex);
@@ -4302,8 +4932,8 @@ static struct pool_workqueue *install_unbound_pwq(struct workqueue_struct *wq,
        /* link_pwq() can handle duplicate calls */
        link_pwq(pwq);
 
-       old_pwq = rcu_access_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu));
-       rcu_assign_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu), pwq);
+       old_pwq = rcu_access_pointer(*slot);
+       rcu_assign_pointer(*slot, pwq);
        return old_pwq;
 }
 
@@ -4384,6 +5014,15 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
        cpumask_copy(new_attrs->__pod_cpumask, new_attrs->cpumask);
        ctx->attrs = new_attrs;
 
+       /*
+        * For initialized ordered workqueues, there should only be one pwq
+        * (dfl_pwq). Set the plugged flag of ctx->dfl_pwq to suspend execution
+        * of newly queued work items until execution of older work items in
+        * the old pwq's have completed.
+        */
+       if ((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs))
+               ctx->dfl_pwq->plugged = true;
+
        ctx->wq = wq;
        return ctx;
 
@@ -4403,14 +5042,14 @@ static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
 
        copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
 
-       /* save the previous pwq and install the new one */
+       /* save the previous pwqs and install the new ones */
        for_each_possible_cpu(cpu)
                ctx->pwq_tbl[cpu] = install_unbound_pwq(ctx->wq, cpu,
                                                        ctx->pwq_tbl[cpu]);
+       ctx->dfl_pwq = install_unbound_pwq(ctx->wq, -1, ctx->dfl_pwq);
 
-       /* @dfl_pwq might not have been used, ensure it's linked */
-       link_pwq(ctx->dfl_pwq);
-       swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
+       /* update node_nr_active->max */
+       wq_update_node_max_active(ctx->wq, -1);
 
        mutex_unlock(&ctx->wq->mutex);
 }
@@ -4424,14 +5063,6 @@ static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
        if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
                return -EINVAL;
 
-       /* creating multiple pwqs breaks ordering guarantee */
-       if (!list_empty(&wq->pwqs)) {
-               if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
-                       return -EINVAL;
-
-               wq->flags &= ~__WQ_ORDERED;
-       }
-
        ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_cpumask);
        if (IS_ERR(ctx))
                return PTR_ERR(ctx);
@@ -4520,9 +5151,7 @@ static void wq_update_pod(struct workqueue_struct *wq, int cpu,
 
        /* nothing to do if the target cpumask matches the current pwq */
        wq_calc_pod_cpumask(target_attrs, cpu, off_cpu);
-       pwq = rcu_dereference_protected(*per_cpu_ptr(wq->cpu_pwq, cpu),
-                                       lockdep_is_held(&wq_pool_mutex));
-       if (wqattrs_equal(target_attrs, pwq->pool->attrs))
+       if (wqattrs_equal(target_attrs, unbound_pwq(wq, cpu)->pool->attrs))
                return;
 
        /* create a new pwq */
@@ -4540,10 +5169,11 @@ static void wq_update_pod(struct workqueue_struct *wq, int cpu,
 
 use_dfl_pwq:
        mutex_lock(&wq->mutex);
-       raw_spin_lock_irq(&wq->dfl_pwq->pool->lock);
-       get_pwq(wq->dfl_pwq);
-       raw_spin_unlock_irq(&wq->dfl_pwq->pool->lock);
-       old_pwq = install_unbound_pwq(wq, cpu, wq->dfl_pwq);
+       pwq = unbound_pwq(wq, -1);
+       raw_spin_lock_irq(&pwq->pool->lock);
+       get_pwq(pwq);
+       raw_spin_unlock_irq(&pwq->pool->lock);
+       old_pwq = install_unbound_pwq(wq, cpu, pwq);
 out_unlock:
        mutex_unlock(&wq->mutex);
        put_pwq_unlocked(old_pwq);
@@ -4560,10 +5190,17 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
 
        if (!(wq->flags & WQ_UNBOUND)) {
                for_each_possible_cpu(cpu) {
-                       struct pool_workqueue **pwq_p =
-                               per_cpu_ptr(wq->cpu_pwq, cpu);
-                       struct worker_pool *pool =
-                               &(per_cpu_ptr(cpu_worker_pools, cpu)[highpri]);
+                       struct pool_workqueue **pwq_p;
+                       struct worker_pool __percpu *pools;
+                       struct worker_pool *pool;
+
+                       if (wq->flags & WQ_BH)
+                               pools = bh_worker_pools;
+                       else
+                               pools = cpu_worker_pools;
+
+                       pool = &(per_cpu_ptr(pools, cpu)[highpri]);
+                       pwq_p = per_cpu_ptr(wq->cpu_pwq, cpu);
 
                        *pwq_p = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL,
                                                       pool->node);
@@ -4581,10 +5218,13 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
 
        cpus_read_lock();
        if (wq->flags & __WQ_ORDERED) {
+               struct pool_workqueue *dfl_pwq;
+
                ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
                /* there should only be single pwq for ordering guarantee */
-               WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
-                             wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
+               dfl_pwq = rcu_access_pointer(wq->dfl_pwq);
+               WARN(!ret && (wq->pwqs.next != &dfl_pwq->pwqs_node ||
+                             wq->pwqs.prev != &dfl_pwq->pwqs_node),
                     "ordering guarantee broken for workqueue %s\n", wq->name);
        } else {
                ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
@@ -4662,6 +5302,69 @@ static int init_rescuer(struct workqueue_struct *wq)
        return 0;
 }
 
+/**
+ * wq_adjust_max_active - update a wq's max_active to the current setting
+ * @wq: target workqueue
+ *
+ * If @wq isn't freezing, set @wq->max_active to the saved_max_active and
+ * activate inactive work items accordingly. If @wq is freezing, clear
+ * @wq->max_active to zero.
+ */
+static void wq_adjust_max_active(struct workqueue_struct *wq)
+{
+       bool activated;
+       int new_max, new_min;
+
+       lockdep_assert_held(&wq->mutex);
+
+       if ((wq->flags & WQ_FREEZABLE) && workqueue_freezing) {
+               new_max = 0;
+               new_min = 0;
+       } else {
+               new_max = wq->saved_max_active;
+               new_min = wq->saved_min_active;
+       }
+
+       if (wq->max_active == new_max && wq->min_active == new_min)
+               return;
+
+       /*
+        * Update @wq->max/min_active and then kick inactive work items if more
+        * active work items are allowed. This doesn't break work item ordering
+        * because new work items are always queued behind existing inactive
+        * work items if there are any.
+        */
+       WRITE_ONCE(wq->max_active, new_max);
+       WRITE_ONCE(wq->min_active, new_min);
+
+       if (wq->flags & WQ_UNBOUND)
+               wq_update_node_max_active(wq, -1);
+
+       if (new_max == 0)
+               return;
+
+       /*
+        * Round-robin through pwq's activating the first inactive work item
+        * until max_active is filled.
+        */
+       do {
+               struct pool_workqueue *pwq;
+
+               activated = false;
+               for_each_pwq(pwq, wq) {
+                       unsigned long flags;
+
+                       /* can be called during early boot w/ irq disabled */
+                       raw_spin_lock_irqsave(&pwq->pool->lock, flags);
+                       if (pwq_activate_first_inactive(pwq, true)) {
+                               activated = true;
+                               kick_pool(pwq->pool);
+                       }
+                       raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
+               }
+       } while (activated);
+}
+
 __printf(1, 4)
 struct workqueue_struct *alloc_workqueue(const char *fmt,
                                         unsigned int flags,
@@ -4669,24 +5372,27 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
 {
        va_list args;
        struct workqueue_struct *wq;
-       struct pool_workqueue *pwq;
-       int len;
+       size_t wq_size;
+       int name_len;
 
-       /*
-        * Unbound && max_active == 1 used to imply ordered, which is no longer
-        * the case on many machines due to per-pod pools. While
-        * alloc_ordered_workqueue() is the right way to create an ordered
-        * workqueue, keep the previous behavior to avoid subtle breakages.
-        */
-       if ((flags & WQ_UNBOUND) && max_active == 1)
-               flags |= __WQ_ORDERED;
+       if (flags & WQ_BH) {
+               if (WARN_ON_ONCE(flags & ~__WQ_BH_ALLOWS))
+                       return NULL;
+               if (WARN_ON_ONCE(max_active))
+                       return NULL;
+       }
 
        /* see the comment above the definition of WQ_POWER_EFFICIENT */
        if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
                flags |= WQ_UNBOUND;
 
        /* allocate wq and format name */
-       wq = kzalloc(sizeof(*wq), GFP_KERNEL);
+       if (flags & WQ_UNBOUND)
+               wq_size = struct_size(wq, node_nr_active, nr_node_ids + 1);
+       else
+               wq_size = sizeof(*wq);
+
+       wq = kzalloc(wq_size, GFP_KERNEL);
        if (!wq)
                return NULL;
 
@@ -4697,18 +5403,30 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
        }
 
        va_start(args, max_active);
-       len = vsnprintf(wq->name, sizeof(wq->name), fmt, args);
+       name_len = vsnprintf(wq->name, sizeof(wq->name), fmt, args);
        va_end(args);
 
-       if (len >= WQ_NAME_LEN)
-               pr_warn_once("workqueue: name exceeds WQ_NAME_LEN. Truncating to: %s\n", wq->name);
+       if (name_len >= WQ_NAME_LEN)
+               pr_warn_once("workqueue: name exceeds WQ_NAME_LEN. Truncating to: %s\n",
+                            wq->name);
 
-       max_active = max_active ?: WQ_DFL_ACTIVE;
-       max_active = wq_clamp_max_active(max_active, flags, wq->name);
+       if (flags & WQ_BH) {
+               /*
+                * BH workqueues always share a single execution context per CPU
+                * and don't impose any max_active limit.
+                */
+               max_active = INT_MAX;
+       } else {
+               max_active = max_active ?: WQ_DFL_ACTIVE;
+               max_active = wq_clamp_max_active(max_active, flags, wq->name);
+       }
 
        /* init wq */
        wq->flags = flags;
-       wq->saved_max_active = max_active;
+       wq->max_active = max_active;
+       wq->min_active = min(max_active, WQ_DFL_MIN_ACTIVE);
+       wq->saved_max_active = wq->max_active;
+       wq->saved_min_active = wq->min_active;
        mutex_init(&wq->mutex);
        atomic_set(&wq->nr_pwqs_to_flush, 0);
        INIT_LIST_HEAD(&wq->pwqs);
@@ -4719,8 +5437,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
        wq_init_lockdep(wq);
        INIT_LIST_HEAD(&wq->list);
 
+       if (flags & WQ_UNBOUND) {
+               if (alloc_node_nr_active(wq->node_nr_active) < 0)
+                       goto err_unreg_lockdep;
+       }
+
        if (alloc_and_link_pwqs(wq) < 0)
-               goto err_unreg_lockdep;
+               goto err_free_node_nr_active;
 
        if (wq_online && init_rescuer(wq) < 0)
                goto err_destroy;
@@ -4736,8 +5459,7 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
        mutex_lock(&wq_pool_mutex);
 
        mutex_lock(&wq->mutex);
-       for_each_pwq(pwq, wq)
-               pwq_adjust_max_active(pwq);
+       wq_adjust_max_active(wq);
        mutex_unlock(&wq->mutex);
 
        list_add_tail_rcu(&wq->list, &workqueues);
@@ -4746,6 +5468,9 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
 
        return wq;
 
+err_free_node_nr_active:
+       if (wq->flags & WQ_UNBOUND)
+               free_node_nr_active(wq->node_nr_active);
 err_unreg_lockdep:
        wq_unregister_lockdep(wq);
        wq_free_lockdep(wq);
@@ -4767,9 +5492,9 @@ static bool pwq_busy(struct pool_workqueue *pwq)
                if (pwq->nr_in_flight[i])
                        return true;
 
-       if ((pwq != pwq->wq->dfl_pwq) && (pwq->refcnt > 1))
+       if ((pwq != rcu_access_pointer(pwq->wq->dfl_pwq)) && (pwq->refcnt > 1))
                return true;
-       if (pwq->nr_active || !list_empty(&pwq->inactive_works))
+       if (!pwq_is_empty(pwq))
                return true;
 
        return false;
@@ -4851,13 +5576,12 @@ void destroy_workqueue(struct workqueue_struct *wq)
        rcu_read_lock();
 
        for_each_possible_cpu(cpu) {
-               pwq = rcu_access_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu));
-               RCU_INIT_POINTER(*per_cpu_ptr(wq->cpu_pwq, cpu), NULL);
-               put_pwq_unlocked(pwq);
+               put_pwq_unlocked(unbound_pwq(wq, cpu));
+               RCU_INIT_POINTER(*unbound_pwq_slot(wq, cpu), NULL);
        }
 
-       put_pwq_unlocked(wq->dfl_pwq);
-       wq->dfl_pwq = NULL;
+       put_pwq_unlocked(unbound_pwq(wq, -1));
+       RCU_INIT_POINTER(*unbound_pwq_slot(wq, -1), NULL);
 
        rcu_read_unlock();
 }
@@ -4868,28 +5592,30 @@ EXPORT_SYMBOL_GPL(destroy_workqueue);
  * @wq: target workqueue
  * @max_active: new max_active value.
  *
- * Set max_active of @wq to @max_active.
+ * Set max_active of @wq to @max_active. See the alloc_workqueue() function
+ * comment.
  *
  * CONTEXT:
  * Don't call from IRQ context.
  */
 void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
 {
-       struct pool_workqueue *pwq;
-
+       /* max_active doesn't mean anything for BH workqueues */
+       if (WARN_ON(wq->flags & WQ_BH))
+               return;
        /* disallow meddling with max_active for ordered workqueues */
-       if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
+       if (WARN_ON(wq->flags & __WQ_ORDERED))
                return;
 
        max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
 
        mutex_lock(&wq->mutex);
 
-       wq->flags &= ~__WQ_ORDERED;
        wq->saved_max_active = max_active;
+       if (wq->flags & WQ_UNBOUND)
+               wq->saved_min_active = min(wq->saved_min_active, max_active);
 
-       for_each_pwq(pwq, wq)
-               pwq_adjust_max_active(pwq);
+       wq_adjust_max_active(wq);
 
        mutex_unlock(&wq->mutex);
 }
@@ -5077,7 +5803,24 @@ static void pr_cont_pool_info(struct worker_pool *pool)
        pr_cont(" cpus=%*pbl", nr_cpumask_bits, pool->attrs->cpumask);
        if (pool->node != NUMA_NO_NODE)
                pr_cont(" node=%d", pool->node);
-       pr_cont(" flags=0x%x nice=%d", pool->flags, pool->attrs->nice);
+       pr_cont(" flags=0x%x", pool->flags);
+       if (pool->flags & POOL_BH)
+               pr_cont(" bh%s",
+                       pool->attrs->nice == HIGHPRI_NICE_LEVEL ? "-hi" : "");
+       else
+               pr_cont(" nice=%d", pool->attrs->nice);
+}
+
+static void pr_cont_worker_id(struct worker *worker)
+{
+       struct worker_pool *pool = worker->pool;
+
+       if (pool->flags & WQ_BH)
+               pr_cont("bh%s",
+                       pool->attrs->nice == HIGHPRI_NICE_LEVEL ? "-hi" : "");
+       else
+               pr_cont("%d%s", task_pid_nr(worker->task),
+                       worker->rescue_wq ? "(RESCUER)" : "");
 }
 
 struct pr_cont_work_struct {
@@ -5136,8 +5879,8 @@ static void show_pwq(struct pool_workqueue *pwq)
        pr_info("  pwq %d:", pool->id);
        pr_cont_pool_info(pool);
 
-       pr_cont(" active=%d/%d refcnt=%d%s\n",
-               pwq->nr_active, pwq->max_active, pwq->refcnt,
+       pr_cont(" active=%d refcnt=%d%s\n",
+               pwq->nr_active, pwq->refcnt,
                !list_empty(&pwq->mayday_node) ? " MAYDAY" : "");
 
        hash_for_each(pool->busy_hash, bkt, worker, hentry) {
@@ -5154,10 +5897,9 @@ static void show_pwq(struct pool_workqueue *pwq)
                        if (worker->current_pwq != pwq)
                                continue;
 
-                       pr_cont("%s %d%s:%ps", comma ? "," : "",
-                               task_pid_nr(worker->task),
-                               worker->rescue_wq ? "(RESCUER)" : "",
-                               worker->current_func);
+                       pr_cont(" %s", comma ? "," : "");
+                       pr_cont_worker_id(worker);
+                       pr_cont(":%ps", worker->current_func);
                        list_for_each_entry(work, &worker->scheduled, entry)
                                pr_cont_work(false, work, &pcws);
                        pr_cont_work_flush(comma, (work_func_t)-1L, &pcws);
@@ -5211,7 +5953,7 @@ void show_one_workqueue(struct workqueue_struct *wq)
        unsigned long flags;
 
        for_each_pwq(pwq, wq) {
-               if (pwq->nr_active || !list_empty(&pwq->inactive_works)) {
+               if (!pwq_is_empty(pwq)) {
                        idle = false;
                        break;
                }
@@ -5223,7 +5965,7 @@ void show_one_workqueue(struct workqueue_struct *wq)
 
        for_each_pwq(pwq, wq) {
                raw_spin_lock_irqsave(&pwq->pool->lock, flags);
-               if (pwq->nr_active || !list_empty(&pwq->inactive_works)) {
+               if (!pwq_is_empty(pwq)) {
                        /*
                         * Defer printing to avoid deadlocks in console
                         * drivers that queue work while holding locks
@@ -5276,8 +6018,8 @@ static void show_one_worker_pool(struct worker_pool *pool)
                pr_cont(" manager: %d",
                        task_pid_nr(pool->manager->task));
        list_for_each_entry(worker, &pool->idle_list, entry) {
-               pr_cont(" %s%d", first ? "idle: " : "",
-                       task_pid_nr(worker->task));
+               pr_cont(" %s", first ? "idle: " : "");
+               pr_cont_worker_id(worker);
                first = false;
        }
        pr_cont("\n");
@@ -5550,13 +6292,15 @@ int workqueue_online_cpu(unsigned int cpu)
        mutex_lock(&wq_pool_mutex);
 
        for_each_pool(pool, pi) {
-               mutex_lock(&wq_pool_attach_mutex);
+               /* BH pools aren't affected by hotplug */
+               if (pool->flags & POOL_BH)
+                       continue;
 
+               mutex_lock(&wq_pool_attach_mutex);
                if (pool->cpu == cpu)
                        rebind_workers(pool);
                else if (pool->cpu < 0)
                        restore_unbound_workers_cpumask(pool, cpu);
-
                mutex_unlock(&wq_pool_attach_mutex);
        }
 
@@ -5570,6 +6314,10 @@ int workqueue_online_cpu(unsigned int cpu)
 
                        for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]])
                                wq_update_pod(wq, tcpu, cpu, true);
+
+                       mutex_lock(&wq->mutex);
+                       wq_update_node_max_active(wq, -1);
+                       mutex_unlock(&wq->mutex);
                }
        }
 
@@ -5598,6 +6346,10 @@ int workqueue_offline_cpu(unsigned int cpu)
 
                        for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]])
                                wq_update_pod(wq, tcpu, cpu, false);
+
+                       mutex_lock(&wq->mutex);
+                       wq_update_node_max_active(wq, cpu);
+                       mutex_unlock(&wq->mutex);
                }
        }
        mutex_unlock(&wq_pool_mutex);
@@ -5685,7 +6437,6 @@ EXPORT_SYMBOL_GPL(work_on_cpu_safe_key);
 void freeze_workqueues_begin(void)
 {
        struct workqueue_struct *wq;
-       struct pool_workqueue *pwq;
 
        mutex_lock(&wq_pool_mutex);
 
@@ -5694,8 +6445,7 @@ void freeze_workqueues_begin(void)
 
        list_for_each_entry(wq, &workqueues, list) {
                mutex_lock(&wq->mutex);
-               for_each_pwq(pwq, wq)
-                       pwq_adjust_max_active(pwq);
+               wq_adjust_max_active(wq);
                mutex_unlock(&wq->mutex);
        }
 
@@ -5760,7 +6510,6 @@ out_unlock:
 void thaw_workqueues(void)
 {
        struct workqueue_struct *wq;
-       struct pool_workqueue *pwq;
 
        mutex_lock(&wq_pool_mutex);
 
@@ -5772,8 +6521,7 @@ void thaw_workqueues(void)
        /* restore max_active and repopulate worklist */
        list_for_each_entry(wq, &workqueues, list) {
                mutex_lock(&wq->mutex);
-               for_each_pwq(pwq, wq)
-                       pwq_adjust_max_active(pwq);
+               wq_adjust_max_active(wq);
                mutex_unlock(&wq->mutex);
        }
 
@@ -5792,16 +6540,9 @@ static int workqueue_apply_unbound_cpumask(const cpumask_var_t unbound_cpumask)
        lockdep_assert_held(&wq_pool_mutex);
 
        list_for_each_entry(wq, &workqueues, list) {
-               if (!(wq->flags & WQ_UNBOUND))
+               if (!(wq->flags & WQ_UNBOUND) || (wq->flags & __WQ_DESTROYING))
                        continue;
 
-               /* creating multiple pwqs breaks ordering guarantee */
-               if (!list_empty(&wq->pwqs)) {
-                       if (wq->flags & __WQ_ORDERED_EXPLICIT)
-                               continue;
-                       wq->flags &= ~__WQ_ORDERED;
-               }
-
                ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs, unbound_cpumask);
                if (IS_ERR(ctx)) {
                        ret = PTR_ERR(ctx);
@@ -6315,11 +7056,10 @@ int workqueue_sysfs_register(struct workqueue_struct *wq)
        int ret;
 
        /*
-        * Adjusting max_active or creating new pwqs by applying
-        * attributes breaks ordering guarantee.  Disallow exposing ordered
-        * workqueues.
+        * Adjusting max_active breaks ordering guarantee.  Disallow exposing
+        * ordered workqueues.
         */
-       if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
+       if (WARN_ON(wq->flags & __WQ_ORDERED))
                return -EINVAL;
 
        wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
@@ -6509,7 +7249,7 @@ static void wq_watchdog_timer_fn(struct timer_list *unused)
                /* did we stall? */
                if (time_after(now, ts + thresh)) {
                        lockup_detected = true;
-                       if (pool->cpu >= 0) {
+                       if (pool->cpu >= 0 && !(pool->flags & POOL_BH)) {
                                pool->cpu_stall = true;
                                cpu_pool_stall = true;
                        }
@@ -6603,6 +7343,22 @@ static void __init restrict_unbound_cpumask(const char *name, const struct cpuma
        cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, mask);
 }
 
+static void __init init_cpu_worker_pool(struct worker_pool *pool, int cpu, int nice)
+{
+       BUG_ON(init_worker_pool(pool));
+       pool->cpu = cpu;
+       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
+       cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
+       pool->attrs->nice = nice;
+       pool->attrs->affn_strict = true;
+       pool->node = cpu_to_node(cpu);
+
+       /* alloc pool ID */
+       mutex_lock(&wq_pool_mutex);
+       BUG_ON(worker_pool_assign_id(pool));
+       mutex_unlock(&wq_pool_mutex);
+}
+
 /**
  * workqueue_init_early - early init for workqueue subsystem
  *
@@ -6658,25 +7414,19 @@ void __init workqueue_init_early(void)
        pt->pod_node[0] = NUMA_NO_NODE;
        pt->cpu_pod[0] = 0;
 
-       /* initialize CPU pools */
+       /* initialize BH and CPU pools */
        for_each_possible_cpu(cpu) {
                struct worker_pool *pool;
 
                i = 0;
-               for_each_cpu_worker_pool(pool, cpu) {
-                       BUG_ON(init_worker_pool(pool));
-                       pool->cpu = cpu;
-                       cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
-                       cpumask_copy(pool->attrs->__pod_cpumask, cpumask_of(cpu));
-                       pool->attrs->nice = std_nice[i++];
-                       pool->attrs->affn_strict = true;
-                       pool->node = cpu_to_node(cpu);
-
-                       /* alloc pool ID */
-                       mutex_lock(&wq_pool_mutex);
-                       BUG_ON(worker_pool_assign_id(pool));
-                       mutex_unlock(&wq_pool_mutex);
+               for_each_bh_worker_pool(pool, cpu) {
+                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
+                       pool->flags |= POOL_BH;
                }
+
+               i = 0;
+               for_each_cpu_worker_pool(pool, cpu)
+                       init_cpu_worker_pool(pool, cpu, std_nice[i++]);
        }
 
        /* create default unbound and ordered wq attrs */
@@ -6709,10 +7459,14 @@ void __init workqueue_init_early(void)
        system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_pwr_efficient",
                                              WQ_FREEZABLE | WQ_POWER_EFFICIENT,
                                              0);
+       system_bh_wq = alloc_workqueue("events_bh", WQ_BH, 0);
+       system_bh_highpri_wq = alloc_workqueue("events_bh_highpri",
+                                              WQ_BH | WQ_HIGHPRI, 0);
        BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
               !system_unbound_wq || !system_freezable_wq ||
               !system_power_efficient_wq ||
-              !system_freezable_power_efficient_wq);
+              !system_freezable_power_efficient_wq ||
+              !system_bh_wq || !system_bh_highpri_wq);
 }
 
 static void __init wq_cpu_intensive_thresh_init(void)
@@ -6778,9 +7532,10 @@ void __init workqueue_init(void)
         * up. Also, create a rescuer for workqueues that requested it.
         */
        for_each_possible_cpu(cpu) {
-               for_each_cpu_worker_pool(pool, cpu) {
+               for_each_bh_worker_pool(pool, cpu)
+                       pool->node = cpu_to_node(cpu);
+               for_each_cpu_worker_pool(pool, cpu)
                        pool->node = cpu_to_node(cpu);
-               }
        }
 
        list_for_each_entry(wq, &workqueues, list) {
@@ -6791,7 +7546,16 @@ void __init workqueue_init(void)
 
        mutex_unlock(&wq_pool_mutex);
 
-       /* create the initial workers */
+       /*
+        * Create the initial workers. A BH pool has one pseudo worker that
+        * represents the shared BH execution context and thus doesn't get
+        * affected by hotplug events. Create the BH pseudo workers for all
+        * possible CPUs here.
+        */
+       for_each_possible_cpu(cpu)
+               for_each_bh_worker_pool(pool, cpu)
+                       BUG_ON(!create_worker(pool));
+
        for_each_online_cpu(cpu) {
                for_each_cpu_worker_pool(pool, cpu) {
                        pool->flags &= ~POOL_DISASSOCIATED;
@@ -6871,7 +7635,7 @@ static bool __init cpus_share_numa(int cpu0, int cpu1)
 /**
  * workqueue_init_topology - initialize CPU pods for unbound workqueues
  *
- * This is the third step of there-staged workqueue subsystem initialization and
+ * This is the third step of three-staged workqueue subsystem initialization and
  * invoked after SMP and topology information are fully initialized. It
  * initializes the unbound CPU pods accordingly.
  */
@@ -6885,6 +7649,8 @@ void __init workqueue_init_topology(void)
        init_pod_type(&wq_pod_types[WQ_AFFN_CACHE], cpus_share_cache);
        init_pod_type(&wq_pod_types[WQ_AFFN_NUMA], cpus_share_numa);
 
+       wq_topo_initialized = true;
+
        mutex_lock(&wq_pool_mutex);
 
        /*
@@ -6893,8 +7659,12 @@ void __init workqueue_init_topology(void)
         * combinations to apply per-pod sharing.
         */
        list_for_each_entry(wq, &workqueues, list) {
-               for_each_online_cpu(cpu) {
+               for_each_online_cpu(cpu)
                        wq_update_pod(wq, cpu, cpu, true);
+               if (wq->flags & WQ_UNBOUND) {
+                       mutex_lock(&wq->mutex);
+                       wq_update_node_max_active(wq, -1);
+                       mutex_unlock(&wq->mutex);
                }
        }