Merge branch 'x86-build-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux-2.6-microblaze.git] / kernel / rcu / tree.c
index a14e5fb..8110514 100644 (file)
@@ -56,6 +56,7 @@
 #include <linux/smpboot.h>
 #include <linux/jiffies.h>
 #include <linux/sched/isolation.h>
+#include <linux/sched/clock.h>
 #include "../time/tick-internal.h"
 
 #include "tree.h"
@@ -210,9 +211,9 @@ static long rcu_get_n_cbs_cpu(int cpu)
 {
        struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 
-       if (rcu_segcblist_is_enabled(&rdp->cblist)) /* Online normal CPU? */
+       if (rcu_segcblist_is_enabled(&rdp->cblist))
                return rcu_segcblist_n_cbs(&rdp->cblist);
-       return rcu_get_n_cbs_nocb_cpu(rdp); /* Works for offline, too. */
+       return 0;
 }
 
 void rcu_softirq_qs(void)
@@ -416,6 +417,12 @@ module_param(qlowmark, long, 0444);
 static ulong jiffies_till_first_fqs = ULONG_MAX;
 static ulong jiffies_till_next_fqs = ULONG_MAX;
 static bool rcu_kick_kthreads;
+static int rcu_divisor = 7;
+module_param(rcu_divisor, int, 0644);
+
+/* Force an exit from rcu_do_batch() after 3 milliseconds. */
+static long rcu_resched_ns = 3 * NSEC_PER_MSEC;
+module_param(rcu_resched_ns, long, 0644);
 
 /*
  * How long the grace period must be before we start recruiting
@@ -1251,6 +1258,7 @@ static bool rcu_accelerate_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
        unsigned long gp_seq_req;
        bool ret = false;
 
+       rcu_lockdep_assert_cblist_protected(rdp);
        raw_lockdep_assert_held_rcu_node(rnp);
 
        /* If no pending (not yet ready to invoke) callbacks, nothing to do. */
@@ -1292,7 +1300,7 @@ static void rcu_accelerate_cbs_unlocked(struct rcu_node *rnp,
        unsigned long c;
        bool needwake;
 
-       lockdep_assert_irqs_disabled();
+       rcu_lockdep_assert_cblist_protected(rdp);
        c = rcu_seq_snap(&rcu_state.gp_seq);
        if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) {
                /* Old request still live, so mark recent callbacks. */
@@ -1318,6 +1326,7 @@ static void rcu_accelerate_cbs_unlocked(struct rcu_node *rnp,
  */
 static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
 {
+       rcu_lockdep_assert_cblist_protected(rdp);
        raw_lockdep_assert_held_rcu_node(rnp);
 
        /* If no pending (not yet ready to invoke) callbacks, nothing to do. */
@@ -1334,6 +1343,21 @@ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
        return rcu_accelerate_cbs(rnp, rdp);
 }
 
+/*
+ * Move and classify callbacks, but only if doing so won't require
+ * that the RCU grace-period kthread be awakened.
+ */
+static void __maybe_unused rcu_advance_cbs_nowake(struct rcu_node *rnp,
+                                                 struct rcu_data *rdp)
+{
+       rcu_lockdep_assert_cblist_protected(rdp);
+       if (!rcu_seq_state(rcu_seq_current(&rnp->gp_seq)) ||
+           !raw_spin_trylock_rcu_node(rnp))
+               return;
+       WARN_ON_ONCE(rcu_advance_cbs(rnp, rdp));
+       raw_spin_unlock_rcu_node(rnp);
+}
+
 /*
  * Update CPU-local rcu_data state to record the beginnings and ends of
  * grace periods.  The caller must hold the ->lock of the leaf rcu_node
@@ -1342,8 +1366,10 @@ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
  */
 static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
 {
-       bool ret;
+       bool ret = false;
        bool need_gp;
+       const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+                              rcu_segcblist_is_offloaded(&rdp->cblist);
 
        raw_lockdep_assert_held_rcu_node(rnp);
 
@@ -1353,10 +1379,12 @@ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp)
        /* Handle the ends of any preceding grace periods first. */
        if (rcu_seq_completed_gp(rdp->gp_seq, rnp->gp_seq) ||
            unlikely(READ_ONCE(rdp->gpwrap))) {
-               ret = rcu_advance_cbs(rnp, rdp); /* Advance callbacks. */
+               if (!offloaded)
+                       ret = rcu_advance_cbs(rnp, rdp); /* Advance CBs. */
                trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuend"));
        } else {
-               ret = rcu_accelerate_cbs(rnp, rdp); /* Recent callbacks. */
+               if (!offloaded)
+                       ret = rcu_accelerate_cbs(rnp, rdp); /* Recent CBs. */
        }
 
        /* Now handle the beginnings of any new-to-this-CPU grace periods. */
@@ -1657,6 +1685,7 @@ static void rcu_gp_cleanup(void)
        unsigned long gp_duration;
        bool needgp = false;
        unsigned long new_gp_seq;
+       bool offloaded;
        struct rcu_data *rdp;
        struct rcu_node *rnp = rcu_get_root();
        struct swait_queue_head *sq;
@@ -1722,7 +1751,9 @@ static void rcu_gp_cleanup(void)
                needgp = true;
        }
        /* Advance CBs to reduce false positives below. */
-       if (!rcu_accelerate_cbs(rnp, rdp) && needgp) {
+       offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+                   rcu_segcblist_is_offloaded(&rdp->cblist);
+       if ((offloaded || !rcu_accelerate_cbs(rnp, rdp)) && needgp) {
                WRITE_ONCE(rcu_state.gp_flags, RCU_GP_FLAG_INIT);
                rcu_state.gp_req_activity = jiffies;
                trace_rcu_grace_period(rcu_state.name,
@@ -1881,7 +1912,7 @@ rcu_report_unblock_qs_rnp(struct rcu_node *rnp, unsigned long flags)
        struct rcu_node *rnp_p;
 
        raw_lockdep_assert_held_rcu_node(rnp);
-       if (WARN_ON_ONCE(!IS_ENABLED(CONFIG_PREEMPT)) ||
+       if (WARN_ON_ONCE(!IS_ENABLED(CONFIG_PREEMPTION)) ||
            WARN_ON_ONCE(rcu_preempt_blocked_readers_cgp(rnp)) ||
            rnp->qsmask != 0) {
                raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
@@ -1916,7 +1947,9 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp)
 {
        unsigned long flags;
        unsigned long mask;
-       bool needwake;
+       bool needwake = false;
+       const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+                              rcu_segcblist_is_offloaded(&rdp->cblist);
        struct rcu_node *rnp;
 
        rnp = rdp->mynode;
@@ -1943,7 +1976,8 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp)
                 * This GP can't end until cpu checks in, so all of our
                 * callbacks can be processed during the next GP.
                 */
-               needwake = rcu_accelerate_cbs(rnp, rdp);
+               if (!offloaded)
+                       needwake = rcu_accelerate_cbs(rnp, rdp);
 
                rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags);
                /* ^^^ Released rnp->lock */
@@ -2077,9 +2111,12 @@ int rcutree_dead_cpu(unsigned int cpu)
 static void rcu_do_batch(struct rcu_data *rdp)
 {
        unsigned long flags;
+       const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+                              rcu_segcblist_is_offloaded(&rdp->cblist);
        struct rcu_head *rhp;
        struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
        long bl, count;
+       long pending, tlimit = 0;
 
        /* If no callbacks are ready, just return. */
        if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
@@ -2099,13 +2136,19 @@ static void rcu_do_batch(struct rcu_data *rdp)
         * callback counts, as rcu_barrier() needs to be conservative.
         */
        local_irq_save(flags);
+       rcu_nocb_lock(rdp);
        WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
-       bl = rdp->blimit;
+       pending = rcu_segcblist_n_cbs(&rdp->cblist);
+       bl = max(rdp->blimit, pending >> rcu_divisor);
+       if (unlikely(bl > 100))
+               tlimit = local_clock() + rcu_resched_ns;
        trace_rcu_batch_start(rcu_state.name,
                              rcu_segcblist_n_lazy_cbs(&rdp->cblist),
                              rcu_segcblist_n_cbs(&rdp->cblist), bl);
        rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl);
-       local_irq_restore(flags);
+       if (offloaded)
+               rdp->qlen_last_fqs_check = rcu_segcblist_n_cbs(&rdp->cblist);
+       rcu_nocb_unlock_irqrestore(rdp, flags);
 
        /* Invoke callbacks. */
        rhp = rcu_cblist_dequeue(&rcl);
@@ -2117,13 +2160,29 @@ static void rcu_do_batch(struct rcu_data *rdp)
                 * Stop only if limit reached and CPU has something to do.
                 * Note: The rcl structure counts down from zero.
                 */
-               if (-rcl.len >= bl &&
+               if (-rcl.len >= bl && !offloaded &&
                    (need_resched() ||
                     (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
                        break;
+               if (unlikely(tlimit)) {
+                       /* only call local_clock() every 32 callbacks */
+                       if (likely((-rcl.len & 31) || local_clock() < tlimit))
+                               continue;
+                       /* Exceeded the time limit, so leave. */
+                       break;
+               }
+               if (offloaded) {
+                       WARN_ON_ONCE(in_serving_softirq());
+                       local_bh_enable();
+                       lockdep_assert_irqs_enabled();
+                       cond_resched_tasks_rcu_qs();
+                       lockdep_assert_irqs_enabled();
+                       local_bh_disable();
+               }
        }
 
        local_irq_save(flags);
+       rcu_nocb_lock(rdp);
        count = -rcl.len;
        trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(),
                            is_idle_task(current), rcu_is_callbacks_kthread());
@@ -2149,12 +2208,14 @@ static void rcu_do_batch(struct rcu_data *rdp)
         * The following usually indicates a double call_rcu().  To track
         * this down, try building with CONFIG_DEBUG_OBJECTS_RCU_HEAD=y.
         */
-       WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0));
+       WARN_ON_ONCE(count == 0 && !rcu_segcblist_empty(&rdp->cblist));
+       WARN_ON_ONCE(!IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+                    count != 0 && rcu_segcblist_empty(&rdp->cblist));
 
-       local_irq_restore(flags);
+       rcu_nocb_unlock_irqrestore(rdp, flags);
 
        /* Re-invoke RCU core processing if there are callbacks remaining. */
-       if (rcu_segcblist_ready_cbs(&rdp->cblist))
+       if (!offloaded && rcu_segcblist_ready_cbs(&rdp->cblist))
                invoke_rcu_core();
 }
 
@@ -2205,7 +2266,7 @@ static void force_qs_rnp(int (*f)(struct rcu_data *rdp))
                mask = 0;
                raw_spin_lock_irqsave_rcu_node(rnp, flags);
                if (rnp->qsmask == 0) {
-                       if (!IS_ENABLED(CONFIG_PREEMPT) ||
+                       if (!IS_ENABLED(CONFIG_PREEMPTION) ||
                            rcu_preempt_blocked_readers_cgp(rnp)) {
                                /*
                                 * No point in scanning bits because they
@@ -2280,6 +2341,8 @@ static __latent_entropy void rcu_core(void)
        unsigned long flags;
        struct rcu_data *rdp = raw_cpu_ptr(&rcu_data);
        struct rcu_node *rnp = rdp->mynode;
+       const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+                              rcu_segcblist_is_offloaded(&rdp->cblist);
 
        if (cpu_is_offline(smp_processor_id()))
                return;
@@ -2299,7 +2362,7 @@ static __latent_entropy void rcu_core(void)
 
        /* No grace period and unregistered callbacks? */
        if (!rcu_gp_in_progress() &&
-           rcu_segcblist_is_enabled(&rdp->cblist)) {
+           rcu_segcblist_is_enabled(&rdp->cblist) && !offloaded) {
                local_irq_save(flags);
                if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
                        rcu_accelerate_cbs_unlocked(rnp, rdp);
@@ -2309,7 +2372,7 @@ static __latent_entropy void rcu_core(void)
        rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check());
 
        /* If there are callbacks ready, invoke them. */
-       if (rcu_segcblist_ready_cbs(&rdp->cblist) &&
+       if (!offloaded && rcu_segcblist_ready_cbs(&rdp->cblist) &&
            likely(READ_ONCE(rcu_scheduler_fully_active)))
                rcu_do_batch(rdp);
 
@@ -2489,10 +2552,11 @@ static void rcu_leak_callback(struct rcu_head *rhp)
  * is expected to specify a CPU.
  */
 static void
-__call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
+__call_rcu(struct rcu_head *head, rcu_callback_t func, bool lazy)
 {
        unsigned long flags;
        struct rcu_data *rdp;
+       bool was_alldone;
 
        /* Misaligned rcu_head! */
        WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1));
@@ -2514,28 +2578,18 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
        rdp = this_cpu_ptr(&rcu_data);
 
        /* Add the callback to our list. */
-       if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist)) || cpu != -1) {
-               int offline;
-
-               if (cpu != -1)
-                       rdp = per_cpu_ptr(&rcu_data, cpu);
-               if (likely(rdp->mynode)) {
-                       /* Post-boot, so this should be for a no-CBs CPU. */
-                       offline = !__call_rcu_nocb(rdp, head, lazy, flags);
-                       WARN_ON_ONCE(offline);
-                       /* Offline CPU, _call_rcu() illegal, leak callback.  */
-                       local_irq_restore(flags);
-                       return;
-               }
-               /*
-                * Very early boot, before rcu_init().  Initialize if needed
-                * and then drop through to queue the callback.
-                */
-               WARN_ON_ONCE(cpu != -1);
+       if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) {
+               // This can trigger due to call_rcu() from offline CPU:
+               WARN_ON_ONCE(rcu_scheduler_active != RCU_SCHEDULER_INACTIVE);
                WARN_ON_ONCE(!rcu_is_watching());
+               // Very early boot, before rcu_init().  Initialize if needed
+               // and then drop through to queue the callback.
                if (rcu_segcblist_empty(&rdp->cblist))
                        rcu_segcblist_init(&rdp->cblist);
        }
+       if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
+               return; // Enqueued onto ->nocb_bypass, so just leave.
+       /* If we get here, rcu_nocb_try_bypass() acquired ->nocb_lock. */
        rcu_segcblist_enqueue(&rdp->cblist, head, lazy);
        if (__is_kfree_rcu_offset((unsigned long)func))
                trace_rcu_kfree_callback(rcu_state.name, head,
@@ -2548,8 +2602,13 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
                                   rcu_segcblist_n_cbs(&rdp->cblist));
 
        /* Go handle any RCU core processing required. */
-       __call_rcu_core(rdp, head, flags);
-       local_irq_restore(flags);
+       if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+           unlikely(rcu_segcblist_is_offloaded(&rdp->cblist))) {
+               __call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
+       } else {
+               __call_rcu_core(rdp, head, flags);
+               local_irq_restore(flags);
+       }
 }
 
 /**
@@ -2589,7 +2648,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy)
  */
 void call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
-       __call_rcu(head, func, -1, 0);
+       __call_rcu(head, func, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu);
 
@@ -2602,7 +2661,7 @@ EXPORT_SYMBOL_GPL(call_rcu);
  */
 void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
-       __call_rcu(head, func, -1, 1);
+       __call_rcu(head, func, 1);
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
@@ -2622,7 +2681,7 @@ static int rcu_blocking_is_gp(void)
 {
        int ret;
 
-       if (IS_ENABLED(CONFIG_PREEMPT))
+       if (IS_ENABLED(CONFIG_PREEMPTION))
                return rcu_scheduler_active == RCU_SCHEDULER_INACTIVE;
        might_sleep();  /* Check for RCU read-side critical section. */
        preempt_disable();
@@ -2735,6 +2794,10 @@ static int rcu_pending(void)
        /* Check for CPU stalls, if enabled. */
        check_cpu_stall(rdp);
 
+       /* Does this CPU need a deferred NOCB wakeup? */
+       if (rcu_nocb_need_deferred_wakeup(rdp))
+               return 1;
+
        /* Is this CPU a NO_HZ_FULL CPU that should ignore RCU? */
        if (rcu_nohz_full_cpu())
                return 0;
@@ -2750,6 +2813,8 @@ static int rcu_pending(void)
        /* Has RCU gone idle with this CPU needing another grace period? */
        if (!rcu_gp_in_progress() &&
            rcu_segcblist_is_enabled(&rdp->cblist) &&
+           (!IS_ENABLED(CONFIG_RCU_NOCB_CPU) ||
+            !rcu_segcblist_is_offloaded(&rdp->cblist)) &&
            !rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
                return 1;
 
@@ -2758,10 +2823,6 @@ static int rcu_pending(void)
            unlikely(READ_ONCE(rdp->gpwrap))) /* outside lock */
                return 1;
 
-       /* Does this CPU need a deferred NOCB wakeup? */
-       if (rcu_nocb_need_deferred_wakeup(rdp))
-               return 1;
-
        /* nothing to do */
        return 0;
 }
@@ -2801,6 +2862,8 @@ static void rcu_barrier_func(void *unused)
        rcu_barrier_trace(TPS("IRQ"), -1, rcu_state.barrier_sequence);
        rdp->barrier_head.func = rcu_barrier_callback;
        debug_rcu_head_queue(&rdp->barrier_head);
+       rcu_nocb_lock(rdp);
+       WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
        if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head, 0)) {
                atomic_inc(&rcu_state.barrier_cpu_count);
        } else {
@@ -2808,6 +2871,7 @@ static void rcu_barrier_func(void *unused)
                rcu_barrier_trace(TPS("IRQNQ"), -1,
                                   rcu_state.barrier_sequence);
        }
+       rcu_nocb_unlock(rdp);
 }
 
 /**
@@ -2858,22 +2922,11 @@ void rcu_barrier(void)
         * corresponding CPU's preceding callbacks have been invoked.
         */
        for_each_possible_cpu(cpu) {
-               if (!cpu_online(cpu) && !rcu_is_nocb_cpu(cpu))
-                       continue;
                rdp = per_cpu_ptr(&rcu_data, cpu);
-               if (rcu_is_nocb_cpu(cpu)) {
-                       if (!rcu_nocb_cpu_needs_barrier(cpu)) {
-                               rcu_barrier_trace(TPS("OfflineNoCB"), cpu,
-                                                  rcu_state.barrier_sequence);
-                       } else {
-                               rcu_barrier_trace(TPS("OnlineNoCB"), cpu,
-                                                  rcu_state.barrier_sequence);
-                               smp_mb__before_atomic();
-                               atomic_inc(&rcu_state.barrier_cpu_count);
-                               __call_rcu(&rdp->barrier_head,
-                                          rcu_barrier_callback, cpu, 0);
-                       }
-               } else if (rcu_segcblist_n_cbs(&rdp->cblist)) {
+               if (!cpu_online(cpu) &&
+                   !rcu_segcblist_is_offloaded(&rdp->cblist))
+                       continue;
+               if (rcu_segcblist_n_cbs(&rdp->cblist)) {
                        rcu_barrier_trace(TPS("OnlineQ"), cpu,
                                           rcu_state.barrier_sequence);
                        smp_call_function_single(cpu, rcu_barrier_func, NULL, 1);
@@ -2958,7 +3011,8 @@ rcu_boot_init_percpu_data(int cpu)
  * Initializes a CPU's per-CPU RCU data.  Note that only one online or
  * offline event can be happening at a given time.  Note also that we can
  * accept some slop in the rsp->gp_seq access due to the fact that this
- * CPU cannot possibly have any RCU callbacks in flight yet.
+ * CPU cannot possibly have any non-offloaded RCU callbacks in flight yet.
+ * And any offloaded callbacks are being numbered elsewhere.
  */
 int rcutree_prepare_cpu(unsigned int cpu)
 {
@@ -2972,7 +3026,7 @@ int rcutree_prepare_cpu(unsigned int cpu)
        rdp->n_force_qs_snap = rcu_state.n_force_qs;
        rdp->blimit = blimit;
        if (rcu_segcblist_empty(&rdp->cblist) && /* No early-boot CBs? */
-           !init_nocb_callback_list(rdp))
+           !rcu_segcblist_is_offloaded(&rdp->cblist))
                rcu_segcblist_init(&rdp->cblist);  /* Re-enable callbacks. */
        rdp->dynticks_nesting = 1;      /* CPU not up, no tearing. */
        rcu_dynticks_eqs_online();
@@ -3151,29 +3205,38 @@ void rcutree_migrate_callbacks(int cpu)
 {
        unsigned long flags;
        struct rcu_data *my_rdp;
+       struct rcu_node *my_rnp;
        struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-       struct rcu_node *rnp_root = rcu_get_root();
        bool needwake;
 
-       if (rcu_is_nocb_cpu(cpu) || rcu_segcblist_empty(&rdp->cblist))
+       if (rcu_segcblist_is_offloaded(&rdp->cblist) ||
+           rcu_segcblist_empty(&rdp->cblist))
                return;  /* No callbacks to migrate. */
 
        local_irq_save(flags);
        my_rdp = this_cpu_ptr(&rcu_data);
-       if (rcu_nocb_adopt_orphan_cbs(my_rdp, rdp, flags)) {
-               local_irq_restore(flags);
-               return;
-       }
-       raw_spin_lock_rcu_node(rnp_root); /* irqs already disabled. */
+       my_rnp = my_rdp->mynode;
+       rcu_nocb_lock(my_rdp); /* irqs already disabled. */
+       WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
+       raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
        /* Leverage recent GPs and set GP for new callbacks. */
-       needwake = rcu_advance_cbs(rnp_root, rdp) ||
-                  rcu_advance_cbs(rnp_root, my_rdp);
+       needwake = rcu_advance_cbs(my_rnp, rdp) ||
+                  rcu_advance_cbs(my_rnp, my_rdp);
        rcu_segcblist_merge(&my_rdp->cblist, &rdp->cblist);
+       needwake = needwake || rcu_advance_cbs(my_rnp, my_rdp);
+       rcu_segcblist_disable(&rdp->cblist);
        WARN_ON_ONCE(rcu_segcblist_empty(&my_rdp->cblist) !=
                     !rcu_segcblist_n_cbs(&my_rdp->cblist));
-       raw_spin_unlock_irqrestore_rcu_node(rnp_root, flags);
+       if (rcu_segcblist_is_offloaded(&my_rdp->cblist)) {
+               raw_spin_unlock_rcu_node(my_rnp); /* irqs remain disabled. */
+               __call_rcu_nocb_wake(my_rdp, true, flags);
+       } else {
+               rcu_nocb_unlock(my_rdp); /* irqs remain disabled. */
+               raw_spin_unlock_irqrestore_rcu_node(my_rnp, flags);
+       }
        if (needwake)
                rcu_gp_kthread_wake();
+       lockdep_assert_irqs_enabled();
        WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 ||
                  !rcu_segcblist_empty(&rdp->cblist),
                  "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n",
@@ -3234,13 +3297,13 @@ static int __init rcu_spawn_gp_kthread(void)
        t = kthread_create(rcu_gp_kthread, NULL, "%s", rcu_state.name);
        if (WARN_ONCE(IS_ERR(t), "%s: Could not start grace-period kthread, OOM is now expected behavior\n", __func__))
                return 0;
-       rnp = rcu_get_root();
-       raw_spin_lock_irqsave_rcu_node(rnp, flags);
-       rcu_state.gp_kthread = t;
        if (kthread_prio) {
                sp.sched_priority = kthread_prio;
                sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
        }
+       rnp = rcu_get_root();
+       raw_spin_lock_irqsave_rcu_node(rnp, flags);
+       rcu_state.gp_kthread = t;
        raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
        wake_up_process(t);
        rcu_spawn_nocb_kthreads();