rcu: Make call_rcu() lazy to save power
authorJoel Fernandes (Google) <joel@joelfernandes.org>
Sun, 16 Oct 2022 16:22:54 +0000 (16:22 +0000)
committerPaul E. McKenney <paulmck@kernel.org>
Tue, 29 Nov 2022 22:02:23 +0000 (14:02 -0800)
Implement timer-based RCU callback batching (also known as lazy
callbacks). With this we save about 5-10% of power consumed due
to RCU requests that happen when system is lightly loaded or idle.

By default, all async callbacks (queued via call_rcu) are marked
lazy. An alternate API call_rcu_hurry() is provided for the few users,
for example synchronize_rcu(), that need the old behavior.

The batch is flushed whenever a certain amount of time has passed, or
the batch on a particular CPU grows too big. Also memory pressure will
flush it in a future patch.

To handle several corner cases automagically (such as rcu_barrier() and
hotplug), we re-use bypass lists which were originally introduced to
address lock contention, to handle lazy CBs as well. The bypass list
length has the lazy CB length included in it. A separate lazy CB length
counter is also introduced to keep track of the number of lazy CBs.

[ paulmck: Fix formatting of inline call_rcu_lazy() definition. ]
[ paulmck: Apply Zqiang feedback. ]
[ paulmck: Apply s/call_rcu_flush/call_rcu_hurry/ feedback from Tejun Heo. ]

Suggested-by: Paul McKenney <paulmck@kernel.org>
Acked-by: Frederic Weisbecker <frederic@kernel.org>
Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org>
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
include/linux/rcupdate.h
kernel/rcu/Kconfig
kernel/rcu/rcu.h
kernel/rcu/tiny.c
kernel/rcu/tree.c
kernel/rcu/tree.h
kernel/rcu/tree_exp.h
kernel/rcu/tree_nocb.h

index 08605ce..611c113 100644 (file)
@@ -108,6 +108,15 @@ static inline int rcu_preempt_depth(void)
 
 #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
 
+#ifdef CONFIG_RCU_LAZY
+void call_rcu_hurry(struct rcu_head *head, rcu_callback_t func);
+#else
+static inline void call_rcu_hurry(struct rcu_head *head, rcu_callback_t func)
+{
+       call_rcu(head, func);
+}
+#endif
+
 /* Internal to kernel */
 void rcu_init(void);
 extern int rcu_scheduler_active;
index d471d22..d78f618 100644 (file)
@@ -311,4 +311,12 @@ config TASKS_TRACE_RCU_READ_MB
          Say N here if you hate read-side memory barriers.
          Take the default if you are unsure.
 
+config RCU_LAZY
+       bool "RCU callback lazy invocation functionality"
+       depends on RCU_NOCB_CPU
+       default n
+       help
+         To save power, batch RCU callbacks and flush after delay, memory
+         pressure, or callback list growing too big.
+
 endmenu # "RCU Subsystem"
index be5979d..65704cb 100644 (file)
@@ -474,6 +474,14 @@ enum rcutorture_type {
        INVALID_RCU_FLAVOR
 };
 
+#if defined(CONFIG_RCU_LAZY)
+unsigned long rcu_lazy_get_jiffies_till_flush(void);
+void rcu_lazy_set_jiffies_till_flush(unsigned long j);
+#else
+static inline unsigned long rcu_lazy_get_jiffies_till_flush(void) { return 0; }
+static inline void rcu_lazy_set_jiffies_till_flush(unsigned long j) { }
+#endif
+
 #if defined(CONFIG_TREE_RCU)
 void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
                            unsigned long *gp_seq);
index a33a8d4..72913ce 100644 (file)
@@ -44,7 +44,7 @@ static struct rcu_ctrlblk rcu_ctrlblk = {
 
 void rcu_barrier(void)
 {
-       wait_rcu_gp(call_rcu);
+       wait_rcu_gp(call_rcu_hurry);
 }
 EXPORT_SYMBOL(rcu_barrier);
 
index fb7a1b9..4b68e50 100644 (file)
@@ -2728,47 +2728,8 @@ static void check_cb_ovld(struct rcu_data *rdp)
        raw_spin_unlock_rcu_node(rnp);
 }
 
-/**
- * call_rcu() - Queue an RCU callback for invocation after a grace period.
- * @head: structure to be used for queueing the RCU updates.
- * @func: actual callback function to be invoked after the grace period
- *
- * The callback function will be invoked some time after a full grace
- * period elapses, in other words after all pre-existing RCU read-side
- * critical sections have completed.  However, the callback function
- * might well execute concurrently with RCU read-side critical sections
- * that started after call_rcu() was invoked.
- *
- * RCU read-side critical sections are delimited by rcu_read_lock()
- * and rcu_read_unlock(), and may be nested.  In addition, but only in
- * v5.0 and later, regions of code across which interrupts, preemption,
- * or softirqs have been disabled also serve as RCU read-side critical
- * sections.  This includes hardware interrupt handlers, softirq handlers,
- * and NMI handlers.
- *
- * Note that all CPUs must agree that the grace period extended beyond
- * all pre-existing RCU read-side critical section.  On systems with more
- * than one CPU, this means that when "func()" is invoked, each CPU is
- * guaranteed to have executed a full memory barrier since the end of its
- * last RCU read-side critical section whose beginning preceded the call
- * to call_rcu().  It also means that each CPU executing an RCU read-side
- * critical section that continues beyond the start of "func()" must have
- * executed a memory barrier after the call_rcu() but before the beginning
- * of that RCU read-side critical section.  Note that these guarantees
- * include CPUs that are offline, idle, or executing in user mode, as
- * well as CPUs that are executing in the kernel.
- *
- * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
- * resulting RCU callback function "func()", then both CPU A and CPU B are
- * guaranteed to execute a full memory barrier during the time interval
- * between the call to call_rcu() and the invocation of "func()" -- even
- * if CPU A and CPU B are the same CPU (but again only if the system has
- * more than one CPU).
- *
- * Implementation of these memory-ordering guarantees is described here:
- * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
- */
-void call_rcu(struct rcu_head *head, rcu_callback_t func)
+static void
+__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
 {
        static atomic_t doublefrees;
        unsigned long flags;
@@ -2809,7 +2770,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
        }
 
        check_cb_ovld(rdp);
-       if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
+       if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
                return; // Enqueued onto ->nocb_bypass, so just leave.
        // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
        rcu_segcblist_enqueue(&rdp->cblist, head);
@@ -2831,8 +2792,84 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
                local_irq_restore(flags);
        }
 }
-EXPORT_SYMBOL_GPL(call_rcu);
 
+#ifdef CONFIG_RCU_LAZY
+/**
+ * call_rcu_hurry() - Queue RCU callback for invocation after grace period, and
+ * flush all lazy callbacks (including the new one) to the main ->cblist while
+ * doing so.
+ *
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual callback function to be invoked after the grace period
+ *
+ * The callback function will be invoked some time after a full grace
+ * period elapses, in other words after all pre-existing RCU read-side
+ * critical sections have completed.
+ *
+ * Use this API instead of call_rcu() if you don't want the callback to be
+ * invoked after very long periods of time, which can happen on systems without
+ * memory pressure and on systems which are lightly loaded or mostly idle.
+ * This function will cause callbacks to be invoked sooner than later at the
+ * expense of extra power. Other than that, this function is identical to, and
+ * reuses call_rcu()'s logic. Refer to call_rcu() for more details about memory
+ * ordering and other functionality.
+ */
+void call_rcu_hurry(struct rcu_head *head, rcu_callback_t func)
+{
+       return __call_rcu_common(head, func, false);
+}
+EXPORT_SYMBOL_GPL(call_rcu_hurry);
+#endif
+
+/**
+ * call_rcu() - Queue an RCU callback for invocation after a grace period.
+ * By default the callbacks are 'lazy' and are kept hidden from the main
+ * ->cblist to prevent starting of grace periods too soon.
+ * If you desire grace periods to start very soon, use call_rcu_hurry().
+ *
+ * @head: structure to be used for queueing the RCU updates.
+ * @func: actual callback function to be invoked after the grace period
+ *
+ * The callback function will be invoked some time after a full grace
+ * period elapses, in other words after all pre-existing RCU read-side
+ * critical sections have completed.  However, the callback function
+ * might well execute concurrently with RCU read-side critical sections
+ * that started after call_rcu() was invoked.
+ *
+ * RCU read-side critical sections are delimited by rcu_read_lock()
+ * and rcu_read_unlock(), and may be nested.  In addition, but only in
+ * v5.0 and later, regions of code across which interrupts, preemption,
+ * or softirqs have been disabled also serve as RCU read-side critical
+ * sections.  This includes hardware interrupt handlers, softirq handlers,
+ * and NMI handlers.
+ *
+ * Note that all CPUs must agree that the grace period extended beyond
+ * all pre-existing RCU read-side critical section.  On systems with more
+ * than one CPU, this means that when "func()" is invoked, each CPU is
+ * guaranteed to have executed a full memory barrier since the end of its
+ * last RCU read-side critical section whose beginning preceded the call
+ * to call_rcu().  It also means that each CPU executing an RCU read-side
+ * critical section that continues beyond the start of "func()" must have
+ * executed a memory barrier after the call_rcu() but before the beginning
+ * of that RCU read-side critical section.  Note that these guarantees
+ * include CPUs that are offline, idle, or executing in user mode, as
+ * well as CPUs that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
+ * resulting RCU callback function "func()", then both CPU A and CPU B are
+ * guaranteed to execute a full memory barrier during the time interval
+ * between the call to call_rcu() and the invocation of "func()" -- even
+ * if CPU A and CPU B are the same CPU (but again only if the system has
+ * more than one CPU).
+ *
+ * Implementation of these memory-ordering guarantees is described here:
+ * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
+ */
+void call_rcu(struct rcu_head *head, rcu_callback_t func)
+{
+       return __call_rcu_common(head, func, IS_ENABLED(CONFIG_RCU_LAZY));
+}
+EXPORT_SYMBOL_GPL(call_rcu);
 
 /* Maximum number of jiffies to wait before draining a batch. */
 #define KFREE_DRAIN_JIFFIES (5 * HZ)
@@ -3507,7 +3544,7 @@ void synchronize_rcu(void)
                if (rcu_gp_is_expedited())
                        synchronize_rcu_expedited();
                else
-                       wait_rcu_gp(call_rcu);
+                       wait_rcu_gp(call_rcu_hurry);
                return;
        }
 
@@ -3910,7 +3947,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
         * if it's fully lazy.
         */
        was_alldone = rcu_rdp_is_offloaded(rdp) && !rcu_segcblist_pend_cbs(&rdp->cblist);
-       WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+       WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
        wake_nocb = was_alldone && rcu_segcblist_pend_cbs(&rdp->cblist);
        if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
                atomic_inc(&rcu_state.barrier_cpu_count);
@@ -4336,7 +4373,7 @@ void rcutree_migrate_callbacks(int cpu)
        my_rdp = this_cpu_ptr(&rcu_data);
        my_rnp = my_rdp->mynode;
        rcu_nocb_lock(my_rdp); /* irqs already disabled. */
-       WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
+       WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, false));
        raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
        /* Leverage recent GPs and set GP for new callbacks. */
        needwake = rcu_advance_cbs(my_rnp, rdp) ||
index 925dd98..fcb5d69 100644 (file)
@@ -263,14 +263,16 @@ struct rcu_data {
        unsigned long last_fqs_resched; /* Time of last rcu_resched(). */
        unsigned long last_sched_clock; /* Jiffies of last rcu_sched_clock_irq(). */
 
+       long lazy_len;                  /* Length of buffered lazy callbacks. */
        int cpu;
 };
 
 /* Values for nocb_defer_wakeup field in struct rcu_data. */
 #define RCU_NOCB_WAKE_NOT      0
 #define RCU_NOCB_WAKE_BYPASS   1
-#define RCU_NOCB_WAKE          2
-#define RCU_NOCB_WAKE_FORCE    3
+#define RCU_NOCB_WAKE_LAZY     2
+#define RCU_NOCB_WAKE          3
+#define RCU_NOCB_WAKE_FORCE    4
 
 #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
                                        /* For jiffies_till_first_fqs and */
@@ -441,9 +443,10 @@ static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
 static void rcu_init_one_nocb(struct rcu_node *rnp);
 static bool wake_nocb_gp(struct rcu_data *rdp, bool force);
 static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                                 unsigned long j);
+                                 unsigned long j, bool lazy);
 static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                               bool *was_alldone, unsigned long flags);
+                               bool *was_alldone, unsigned long flags,
+                               bool lazy);
 static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
                                 unsigned long flags);
 static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
index 18e9b4c..ed6c3cc 100644 (file)
@@ -937,7 +937,7 @@ void synchronize_rcu_expedited(void)
 
        /* If expedited grace periods are prohibited, fall back to normal. */
        if (rcu_gp_is_normal()) {
-               wait_rcu_gp(call_rcu);
+               wait_rcu_gp(call_rcu_hurry);
                return;
        }
 
index 094fd45..d6e4c07 100644 (file)
@@ -256,6 +256,31 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
        return __wake_nocb_gp(rdp_gp, rdp, force, flags);
 }
 
+/*
+ * LAZY_FLUSH_JIFFIES decides the maximum amount of time that
+ * can elapse before lazy callbacks are flushed. Lazy callbacks
+ * could be flushed much earlier for a number of other reasons
+ * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are
+ * left unsubmitted to RCU after those many jiffies.
+ */
+#define LAZY_FLUSH_JIFFIES (10 * HZ)
+static unsigned long jiffies_till_flush = LAZY_FLUSH_JIFFIES;
+
+#ifdef CONFIG_RCU_LAZY
+// To be called only from test code.
+void rcu_lazy_set_jiffies_till_flush(unsigned long jif)
+{
+       jiffies_till_flush = jif;
+}
+EXPORT_SYMBOL(rcu_lazy_set_jiffies_till_flush);
+
+unsigned long rcu_lazy_get_jiffies_till_flush(void)
+{
+       return jiffies_till_flush;
+}
+EXPORT_SYMBOL(rcu_lazy_get_jiffies_till_flush);
+#endif
+
 /*
  * Arrange to wake the GP kthread for this NOCB group at some future
  * time when it is safe to do so.
@@ -269,10 +294,14 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
        raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
 
        /*
-        * Bypass wakeup overrides previous deferments. In case
-        * of callback storm, no need to wake up too early.
+        * Bypass wakeup overrides previous deferments. In case of
+        * callback storms, no need to wake up too early.
         */
-       if (waketype == RCU_NOCB_WAKE_BYPASS) {
+       if (waketype == RCU_NOCB_WAKE_LAZY &&
+           rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) {
+               mod_timer(&rdp_gp->nocb_timer, jiffies + jiffies_till_flush);
+               WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+       } else if (waketype == RCU_NOCB_WAKE_BYPASS) {
                mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
                WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
        } else {
@@ -293,10 +322,13 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
  * proves to be initially empty, just return false because the no-CB GP
  * kthread may need to be awakened in this case.
  *
+ * Return true if there was something to be flushed and it succeeded, otherwise
+ * false.
+ *
  * Note that this function always returns true if rhp is NULL.
  */
 static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                                    unsigned long j)
+                                    unsigned long j, bool lazy)
 {
        struct rcu_cblist rcl;
 
@@ -310,7 +342,20 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
        /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
        if (rhp)
                rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
-       rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
+
+       /*
+        * If the new CB requested was a lazy one, queue it onto the main
+        * ->cblist so we can take advantage of a sooner grade period.
+        */
+       if (lazy && rhp) {
+               rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, NULL);
+               rcu_cblist_enqueue(&rcl, rhp);
+               WRITE_ONCE(rdp->lazy_len, 0);
+       } else {
+               rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
+               WRITE_ONCE(rdp->lazy_len, 0);
+       }
+
        rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
        WRITE_ONCE(rdp->nocb_bypass_first, j);
        rcu_nocb_bypass_unlock(rdp);
@@ -326,13 +371,13 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
  * Note that this function always returns true if rhp is NULL.
  */
 static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                                 unsigned long j)
+                                 unsigned long j, bool lazy)
 {
        if (!rcu_rdp_is_offloaded(rdp))
                return true;
        rcu_lockdep_assert_cblist_protected(rdp);
        rcu_nocb_bypass_lock(rdp);
-       return rcu_nocb_do_flush_bypass(rdp, rhp, j);
+       return rcu_nocb_do_flush_bypass(rdp, rhp, j, lazy);
 }
 
 /*
@@ -345,7 +390,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
        if (!rcu_rdp_is_offloaded(rdp) ||
            !rcu_nocb_bypass_trylock(rdp))
                return;
-       WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
+       WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, false));
 }
 
 /*
@@ -367,12 +412,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
  * there is only one CPU in operation.
  */
 static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                               bool *was_alldone, unsigned long flags)
+                               bool *was_alldone, unsigned long flags,
+                               bool lazy)
 {
        unsigned long c;
        unsigned long cur_gp_seq;
        unsigned long j = jiffies;
        long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+       bool bypass_is_lazy = (ncbs == READ_ONCE(rdp->lazy_len));
 
        lockdep_assert_irqs_disabled();
 
@@ -417,25 +464,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
        // If there hasn't yet been all that many ->cblist enqueues
        // this jiffy, tell the caller to enqueue onto ->cblist.  But flush
        // ->nocb_bypass first.
-       if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
+       // Lazy CBs throttle this back and do immediate bypass queuing.
+       if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) {
                rcu_nocb_lock(rdp);
                *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
                if (*was_alldone)
                        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                            TPS("FirstQ"));
-               WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
+
+               WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, false));
                WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
                return false; // Caller must enqueue the callback.
        }
 
        // If ->nocb_bypass has been used too long or is too full,
        // flush ->nocb_bypass to ->cblist.
-       if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
+       if ((ncbs && !bypass_is_lazy && j != READ_ONCE(rdp->nocb_bypass_first)) ||
+           (ncbs &&  bypass_is_lazy &&
+            (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush))) ||
            ncbs >= qhimark) {
                rcu_nocb_lock(rdp);
                *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
 
-               if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
+               if (!rcu_nocb_flush_bypass(rdp, rhp, j, lazy)) {
                        if (*was_alldone)
                                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                                    TPS("FirstQ"));
@@ -463,13 +514,24 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
        ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
        rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
        rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
+
+       if (lazy)
+               WRITE_ONCE(rdp->lazy_len, rdp->lazy_len + 1);
+
        if (!ncbs) {
                WRITE_ONCE(rdp->nocb_bypass_first, j);
                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
        }
        rcu_nocb_bypass_unlock(rdp);
        smp_mb(); /* Order enqueue before wake. */
-       if (ncbs) {
+       // A wake up of the grace period kthread or timer adjustment
+       // needs to be done only if:
+       // 1. Bypass list was fully empty before (this is the first
+       //    bypass list entry), or:
+       // 2. Both of these conditions are met:
+       //    a. The bypass list previously had only lazy CBs, and:
+       //    b. The new CB is non-lazy.
+       if (ncbs && (!bypass_is_lazy || lazy)) {
                local_irq_restore(flags);
        } else {
                // No-CBs GP kthread might be indefinitely asleep, if so, wake.
@@ -497,8 +559,10 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
                                 unsigned long flags)
                                 __releases(rdp->nocb_lock)
 {
+       long bypass_len;
        unsigned long cur_gp_seq;
        unsigned long j;
+       long lazy_len;
        long len;
        struct task_struct *t;
 
@@ -512,9 +576,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
        }
        // Need to actually to a wakeup.
        len = rcu_segcblist_n_cbs(&rdp->cblist);
+       bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+       lazy_len = READ_ONCE(rdp->lazy_len);
        if (was_alldone) {
                rdp->qlen_last_fqs_check = len;
-               if (!irqs_disabled_flags(flags)) {
+               // Only lazy CBs in bypass list
+               if (lazy_len && bypass_len == lazy_len) {
+                       rcu_nocb_unlock_irqrestore(rdp, flags);
+                       wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY,
+                                          TPS("WakeLazy"));
+               } else if (!irqs_disabled_flags(flags)) {
                        /* ... if queue was empty ... */
                        rcu_nocb_unlock_irqrestore(rdp, flags);
                        wake_nocb_gp(rdp, false);
@@ -605,12 +676,12 @@ static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu)
 static void nocb_gp_wait(struct rcu_data *my_rdp)
 {
        bool bypass = false;
-       long bypass_ncbs;
        int __maybe_unused cpu = my_rdp->cpu;
        unsigned long cur_gp_seq;
        unsigned long flags;
        bool gotcbs = false;
        unsigned long j = jiffies;
+       bool lazy = false;
        bool needwait_gp = false; // This prevents actual uninitialized use.
        bool needwake;
        bool needwake_gp;
@@ -640,24 +711,43 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
         * won't be ignored for long.
         */
        list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) {
+               long bypass_ncbs;
+               bool flush_bypass = false;
+               long lazy_ncbs;
+
                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
                rcu_nocb_lock_irqsave(rdp, flags);
                lockdep_assert_held(&rdp->nocb_lock);
                bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-               if (bypass_ncbs &&
+               lazy_ncbs = READ_ONCE(rdp->lazy_len);
+
+               if (bypass_ncbs && (lazy_ncbs == bypass_ncbs) &&
+                   (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush) ||
+                    bypass_ncbs > 2 * qhimark)) {
+                       flush_bypass = true;
+               } else if (bypass_ncbs && (lazy_ncbs != bypass_ncbs) &&
                    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
                     bypass_ncbs > 2 * qhimark)) {
-                       // Bypass full or old, so flush it.
-                       (void)rcu_nocb_try_flush_bypass(rdp, j);
-                       bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+                       flush_bypass = true;
                } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
                        rcu_nocb_unlock_irqrestore(rdp, flags);
                        continue; /* No callbacks here, try next. */
                }
+
+               if (flush_bypass) {
+                       // Bypass full or old, so flush it.
+                       (void)rcu_nocb_try_flush_bypass(rdp, j);
+                       bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+                       lazy_ncbs = READ_ONCE(rdp->lazy_len);
+               }
+
                if (bypass_ncbs) {
                        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                           TPS("Bypass"));
-                       bypass = true;
+                                           bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass"));
+                       if (bypass_ncbs == lazy_ncbs)
+                               lazy = true;
+                       else
+                               bypass = true;
                }
                rnp = rdp->mynode;
 
@@ -705,12 +795,20 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
        my_rdp->nocb_gp_gp = needwait_gp;
        my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
 
-       if (bypass && !rcu_nocb_poll) {
-               // At least one child with non-empty ->nocb_bypass, so set
-               // timer in order to avoid stranding its callbacks.
-               wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
-                                  TPS("WakeBypassIsDeferred"));
+       // At least one child with non-empty ->nocb_bypass, so set
+       // timer in order to avoid stranding its callbacks.
+       if (!rcu_nocb_poll) {
+               // If bypass list only has lazy CBs. Add a deferred lazy wake up.
+               if (lazy && !bypass) {
+                       wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY,
+                                       TPS("WakeLazyIsDeferred"));
+               // Otherwise add a deferred bypass wake up.
+               } else if (bypass) {
+                       wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
+                                       TPS("WakeBypassIsDeferred"));
+               }
        }
+
        if (rcu_nocb_poll) {
                /* Polling, so trace if first poll in the series. */
                if (gotcbs)
@@ -1036,7 +1134,7 @@ static long rcu_nocb_rdp_deoffload(void *arg)
         * return false, which means that future calls to rcu_nocb_try_bypass()
         * will refuse to put anything into the bypass.
         */
-       WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+       WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, false));
        /*
         * Start with invoking rcu_core() early. This way if the current thread
         * happens to preempt an ongoing call to rcu_core() in the middle,
@@ -1278,6 +1376,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
        raw_spin_lock_init(&rdp->nocb_gp_lock);
        timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
        rcu_cblist_init(&rdp->nocb_bypass);
+       WRITE_ONCE(rdp->lazy_len, 0);
        mutex_init(&rdp->nocb_gp_kthread_mutex);
 }
 
@@ -1564,13 +1663,13 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
 }
 
 static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                                 unsigned long j)
+                                 unsigned long j, bool lazy)
 {
        return true;
 }
 
 static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                               bool *was_alldone, unsigned long flags)
+                               bool *was_alldone, unsigned long flags, bool lazy)
 {
        return false;
 }