Merge branch 'core-rcu.2021.08.28a' of git://git.kernel.org/pub/scm/linux/kernel...
authorLinus Torvalds <torvalds@linux-foundation.org>
Mon, 30 Aug 2021 19:48:01 +0000 (12:48 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Mon, 30 Aug 2021 19:48:01 +0000 (12:48 -0700)
Pull RCU updates from Paul McKenney:
 "RCU changes for this cycle were:

   - Documentation updates

   - Miscellaneous fixes

   - Offloaded-callbacks updates

   - Updates to the nolibc library

   - Tasks-RCU updates

   - In-kernel torture-test updates

   - Torture-test scripting, perhaps most notably the pinning of
     torture-test guest OSes so as to force differences in memory
     latency. For example, in a two-socket system, a four-CPU guest OS
     will have one pair of its CPUs pinned to threads in a single core
     on one socket and the other pair pinned to threads in a single core
     on the other socket. This approach proved able to force race
     conditions that earlier testing missed. Some of these race
     conditions are still being tracked down"

* 'core-rcu.2021.08.28a' of git://git.kernel.org/pub/scm/linux/kernel/git/paulmck/linux-rcu: (61 commits)
  torture: Replace deprecated CPU-hotplug functions.
  rcu: Replace deprecated CPU-hotplug functions
  rcu: Print human-readable message for schedule() in RCU reader
  rcu: Explain why rcu_all_qs() is a stub in preemptible TREE RCU
  rcu: Use per_cpu_ptr to get the pointer of per_cpu variable
  rcu: Remove useless "ret" update in rcu_gp_fqs_loop()
  rcu: Mark accesses in tree_stall.h
  rcu: Make rcu_gp_init() and rcu_gp_fqs_loop noinline to conserve stack
  rcu: Mark lockless ->qsmask read in rcu_check_boost_fail()
  srcutiny: Mark read-side data races
  rcu: Start timing stall repetitions after warning complete
  rcu: Do not disable GP stall detection in rcu_cpu_stall_reset()
  rcu/tree: Handle VM stoppage in stall detection
  rculist: Unify documentation about missing list_empty_rcu()
  rcu: Mark accesses to ->rcu_read_lock_nesting
  rcu: Weaken ->dynticks accesses and updates
  rcu: Remove special bit at the bottom of the ->dynticks counter
  rcu: Fix stall-warning deadlock due to non-release of rcu_node ->lock
  rcu: Fix to include first blocked task in stall warning
  torture: Make kvm-test-1-run-qemu.sh check for reboot loops
  ...

41 files changed:
Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst
Documentation/RCU/Design/Requirements/Requirements.rst
Documentation/RCU/checklist.rst
Documentation/RCU/rcu_dereference.rst
Documentation/RCU/stallwarn.rst
include/linux/rculist.h
include/linux/rcupdate.h
include/linux/rcutiny.h
include/linux/srcutiny.h
kernel/locking/locktorture.c
kernel/rcu/rcuscale.c
kernel/rcu/rcutorture.c
kernel/rcu/refscale.c
kernel/rcu/srcutiny.c
kernel/rcu/tasks.h
kernel/rcu/tree.c
kernel/rcu/tree_nocb.h [new file with mode: 0644]
kernel/rcu/tree_plugin.h
kernel/rcu/tree_stall.h
kernel/scftorture.c
kernel/sched/core.c
kernel/torture.c
tools/include/nolibc/nolibc.h
tools/testing/selftests/rcutorture/bin/jitter.sh
tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh
tools/testing/selftests/rcutorture/bin/kvm-again.sh
tools/testing/selftests/rcutorture/bin/kvm-assign-cpus.sh [new file with mode: 0755]
tools/testing/selftests/rcutorture/bin/kvm-get-cpus-script.sh [new file with mode: 0755]
tools/testing/selftests/rcutorture/bin/kvm-recheck-lock.sh
tools/testing/selftests/rcutorture/bin/kvm-recheck-scf.sh
tools/testing/selftests/rcutorture/bin/kvm-recheck.sh
tools/testing/selftests/rcutorture/bin/kvm-remote-noreap.sh [new file with mode: 0755]
tools/testing/selftests/rcutorture/bin/kvm-remote.sh
tools/testing/selftests/rcutorture/bin/kvm-test-1-run-batch.sh
tools/testing/selftests/rcutorture/bin/kvm-test-1-run-qemu.sh
tools/testing/selftests/rcutorture/bin/kvm-test-1-run.sh
tools/testing/selftests/rcutorture/bin/kvm.sh
tools/testing/selftests/rcutorture/bin/torture.sh
tools/testing/selftests/rcutorture/configs/rcu/RUDE01
tools/testing/selftests/rcutorture/configs/rcu/TASKS01
tools/testing/selftests/rcutorture/configs/rcu/TASKS03

index 11cdab0..eeb3512 100644 (file)
@@ -112,6 +112,35 @@ on PowerPC.
 The ``smp_mb__after_unlock_lock()`` invocations prevent this
 ``WARN_ON()`` from triggering.
 
++-----------------------------------------------------------------------+
+| **Quick Quiz**:                                                       |
++-----------------------------------------------------------------------+
+| But the chain of rcu_node-structure lock acquisitions guarantees      |
+| that new readers will see all of the updater's pre-grace-period       |
+| accesses and also guarantees that the updater's post-grace-period     |
+| accesses will see all of the old reader's accesses.  So why do we     |
+| need all of those calls to smp_mb__after_unlock_lock()?               |
++-----------------------------------------------------------------------+
+| **Answer**:                                                           |
++-----------------------------------------------------------------------+
+| Because we must provide ordering for RCU's polling grace-period       |
+| primitives, for example, get_state_synchronize_rcu() and              |
+| poll_state_synchronize_rcu().  Consider this code::                   |
+|                                                                       |
+|  CPU 0                                     CPU 1                      |
+|  ----                                      ----                       |
+|  WRITE_ONCE(X, 1)                          WRITE_ONCE(Y, 1)           |
+|  g = get_state_synchronize_rcu()           smp_mb()                   |
+|  while (!poll_state_synchronize_rcu(g))    r1 = READ_ONCE(X)          |
+|          continue;                                                    |
+|  r0 = READ_ONCE(Y)                                                    |
+|                                                                       |
+| RCU guarantees that the outcome r0 == 0 && r1 == 0 will not           |
+| happen, even if CPU 1 is in an RCU extended quiescent state           |
+| (idle or offline) and thus won't interact directly with the RCU       |
+| core processing at all.                                               |
++-----------------------------------------------------------------------+
+
 This approach must be extended to include idle CPUs, which need
 RCU's grace-period memory ordering guarantee to extend to any
 RCU read-side critical sections preceding and following the current
index 38a3947..45278e2 100644 (file)
@@ -362,9 +362,8 @@ do_something_gp() uses rcu_dereference() to fetch from ``gp``:
       12 }
 
 The rcu_dereference() uses volatile casts and (for DEC Alpha) memory
-barriers in the Linux kernel. Should a `high-quality implementation of
-C11 ``memory_order_consume``
-[PDF] <http://www.rdrop.com/users/paulmck/RCU/consume.2015.07.13a.pdf>`__
+barriers in the Linux kernel. Should a |high-quality implementation of
+C11 memory_order_consume [PDF]|_
 ever appear, then rcu_dereference() could be implemented as a
 ``memory_order_consume`` load. Regardless of the exact implementation, a
 pointer fetched by rcu_dereference() may not be used outside of the
@@ -374,6 +373,9 @@ element has been passed from RCU to some other synchronization
 mechanism, most commonly locking or `reference
 counting <https://www.kernel.org/doc/Documentation/RCU/rcuref.txt>`__.
 
+.. |high-quality implementation of C11 memory_order_consume [PDF]| replace:: high-quality implementation of C11 ``memory_order_consume`` [PDF]
+.. _high-quality implementation of C11 memory_order_consume [PDF]: http://www.rdrop.com/users/paulmck/RCU/consume.2015.07.13a.pdf
+
 In short, updaters use rcu_assign_pointer() and readers use
 rcu_dereference(), and these two RCU API elements work together to
 ensure that readers have a consistent view of newly added data elements.
index 01cc21f..f4545b7 100644 (file)
@@ -37,7 +37,7 @@ over a rather long period of time, but improvements are always welcome!
 
 1.     Does the update code have proper mutual exclusion?
 
-       RCU does allow -readers- to run (almost) naked, but -writers- must
+       RCU does allow *readers* to run (almost) naked, but *writers* must
        still use some sort of mutual exclusion, such as:
 
        a.      locking,
@@ -73,7 +73,7 @@ over a rather long period of time, but improvements are always welcome!
        critical section is every bit as bad as letting them leak out
        from under a lock.  Unless, of course, you have arranged some
        other means of protection, such as a lock or a reference count
-       -before- letting them out of the RCU read-side critical section.
+       *before* letting them out of the RCU read-side critical section.
 
 3.     Does the update code tolerate concurrent accesses?
 
@@ -101,7 +101,7 @@ over a rather long period of time, but improvements are always welcome!
        c.      Make updates appear atomic to readers.  For example,
                pointer updates to properly aligned fields will
                appear atomic, as will individual atomic primitives.
-               Sequences of operations performed under a lock will -not-
+               Sequences of operations performed under a lock will *not*
                appear to be atomic to RCU readers, nor will sequences
                of multiple atomic primitives.
 
@@ -333,7 +333,7 @@ over a rather long period of time, but improvements are always welcome!
        for example) may be omitted.
 
 10.    Conversely, if you are in an RCU read-side critical section,
-       and you don't hold the appropriate update-side lock, you -must-
+       and you don't hold the appropriate update-side lock, you *must*
        use the "_rcu()" variants of the list macros.  Failing to do so
        will break Alpha, cause aggressive compilers to generate bad code,
        and confuse people trying to read your code.
@@ -359,12 +359,12 @@ over a rather long period of time, but improvements are always welcome!
        callback pending, then that RCU callback will execute on some
        surviving CPU.  (If this was not the case, a self-spawning RCU
        callback would prevent the victim CPU from ever going offline.)
-       Furthermore, CPUs designated by rcu_nocbs= might well -always-
+       Furthermore, CPUs designated by rcu_nocbs= might well *always*
        have their RCU callbacks executed on some other CPUs, in fact,
        for some  real-time workloads, this is the whole point of using
        the rcu_nocbs= kernel boot parameter.
 
-13.    Unlike other forms of RCU, it -is- permissible to block in an
+13.    Unlike other forms of RCU, it *is* permissible to block in an
        SRCU read-side critical section (demarked by srcu_read_lock()
        and srcu_read_unlock()), hence the "SRCU": "sleepable RCU".
        Please note that if you don't need to sleep in read-side critical
@@ -411,16 +411,16 @@ over a rather long period of time, but improvements are always welcome!
 14.    The whole point of call_rcu(), synchronize_rcu(), and friends
        is to wait until all pre-existing readers have finished before
        carrying out some otherwise-destructive operation.  It is
-       therefore critically important to -first- remove any path
+       therefore critically important to *first* remove any path
        that readers can follow that could be affected by the
-       destructive operation, and -only- -then- invoke call_rcu(),
+       destructive operation, and *only then* invoke call_rcu(),
        synchronize_rcu(), or friends.
 
        Because these primitives only wait for pre-existing readers, it
        is the caller's responsibility to guarantee that any subsequent
        readers will execute safely.
 
-15.    The various RCU read-side primitives do -not- necessarily contain
+15.    The various RCU read-side primitives do *not* necessarily contain
        memory barriers.  You should therefore plan for the CPU
        and the compiler to freely reorder code into and out of RCU
        read-side critical sections.  It is the responsibility of the
@@ -459,8 +459,8 @@ over a rather long period of time, but improvements are always welcome!
        pass in a function defined within a loadable module, then it in
        necessary to wait for all pending callbacks to be invoked after
        the last invocation and before unloading that module.  Note that
-       it is absolutely -not- sufficient to wait for a grace period!
-       The current (say) synchronize_rcu() implementation is -not-
+       it is absolutely *not* sufficient to wait for a grace period!
+       The current (say) synchronize_rcu() implementation is *not*
        guaranteed to wait for callbacks registered on other CPUs.
        Or even on the current CPU if that CPU recently went offline
        and came back online.
@@ -470,7 +470,7 @@ over a rather long period of time, but improvements are always welcome!
        -       call_rcu() -> rcu_barrier()
        -       call_srcu() -> srcu_barrier()
 
-       However, these barrier functions are absolutely -not- guaranteed
+       However, these barrier functions are absolutely *not* guaranteed
        to wait for a grace period.  In fact, if there are no call_rcu()
        callbacks waiting anywhere in the system, rcu_barrier() is within
        its rights to return immediately.
index f3e587a..0b418a5 100644 (file)
@@ -43,7 +43,7 @@ Follow these rules to keep your RCU code working properly:
        -       Set bits and clear bits down in the must-be-zero low-order
                bits of that pointer.  This clearly means that the pointer
                must have alignment constraints, for example, this does
-               -not- work in general for char* pointers.
+               *not* work in general for char* pointers.
 
        -       XOR bits to translate pointers, as is done in some
                classic buddy-allocator algorithms.
@@ -174,7 +174,7 @@ Follow these rules to keep your RCU code working properly:
                Please see the "CONTROL DEPENDENCIES" section of
                Documentation/memory-barriers.txt for more details.
 
-       -       The pointers are not equal -and- the compiler does
+       -       The pointers are not equal *and* the compiler does
                not have enough information to deduce the value of the
                pointer.  Note that the volatile cast in rcu_dereference()
                will normally prevent the compiler from knowing too much.
@@ -360,7 +360,7 @@ in turn destroying the ordering between this load and the loads of the
 return values.  This can result in "p->b" returning pre-initialization
 garbage values.
 
-In short, rcu_dereference() is -not- optional when you are going to
+In short, rcu_dereference() is *not* optional when you are going to
 dereference the resulting pointer.
 
 
index 7148e9b..5036df2 100644 (file)
@@ -32,7 +32,7 @@ warnings:
 
 -      Booting Linux using a console connection that is too slow to
        keep up with the boot-time console-message rate.  For example,
-       a 115Kbaud serial console can be -way- too slow to keep up
+       a 115Kbaud serial console can be *way* too slow to keep up
        with boot-time message rates, and will frequently result in
        RCU CPU stall warning messages.  Especially if you have added
        debug printk()s.
@@ -105,7 +105,7 @@ warnings:
        leading the realization that the CPU had failed.
 
 The RCU, RCU-sched, and RCU-tasks implementations have CPU stall warning.
-Note that SRCU does -not- have CPU stall warnings.  Please note that
+Note that SRCU does *not* have CPU stall warnings.  Please note that
 RCU only detects CPU stalls when there is a grace period in progress.
 No grace period, no CPU stall warnings.
 
@@ -145,7 +145,7 @@ CONFIG_RCU_CPU_STALL_TIMEOUT
        this parameter is checked only at the beginning of a cycle.
        So if you are 10 seconds into a 40-second stall, setting this
        sysfs parameter to (say) five will shorten the timeout for the
-       -next- stall, or the following warning for the current stall
+       *next* stall, or the following warning for the current stall
        (assuming the stall lasts long enough).  It will not affect the
        timing of the next warning for the current stall.
 
@@ -189,8 +189,8 @@ rcupdate.rcu_task_stall_timeout
 Interpreting RCU's CPU Stall-Detector "Splats"
 ==============================================
 
-For non-RCU-tasks flavors of RCU, when a CPU detects that it is stalling,
-it will print a message similar to the following::
+For non-RCU-tasks flavors of RCU, when a CPU detects that some other
+CPU is stalling, it will print a message similar to the following::
 
        INFO: rcu_sched detected stalls on CPUs/tasks:
        2-...: (3 GPs behind) idle=06c/0/0 softirq=1453/1455 fqs=0
@@ -202,8 +202,10 @@ causing stalls, and that the stall was affecting RCU-sched.  This message
 will normally be followed by stack dumps for each CPU.  Please note that
 PREEMPT_RCU builds can be stalled by tasks as well as by CPUs, and that
 the tasks will be indicated by PID, for example, "P3421".  It is even
-possible for an rcu_state stall to be caused by both CPUs -and- tasks,
+possible for an rcu_state stall to be caused by both CPUs *and* tasks,
 in which case the offending CPUs and tasks will all be called out in the list.
+In some cases, CPUs will detect themselves stalling, which will result
+in a self-detected stall.
 
 CPU 2's "(3 GPs behind)" indicates that this CPU has not interacted with
 the RCU core for the past three grace periods.  In contrast, CPU 16's "(0
@@ -224,7 +226,7 @@ is the number that had executed since boot at the time that this CPU
 last noted the beginning of a grace period, which might be the current
 (stalled) grace period, or it might be some earlier grace period (for
 example, if the CPU might have been in dyntick-idle mode for an extended
-time period.  The number after the "/" is the number that have executed
+time period).  The number after the "/" is the number that have executed
 since boot until the current time.  If this latter number stays constant
 across repeated stall-warning messages, it is possible that RCU's softirq
 handlers are no longer able to execute on this CPU.  This can happen if
@@ -283,7 +285,8 @@ If the relevant grace-period kthread has been unable to run prior to
 the stall warning, as was the case in the "All QSes seen" line above,
 the following additional line is printed::
 
-       kthread starved for 23807 jiffies! g7075 f0x0 RCU_GP_WAIT_FQS(3) ->state=0x1 ->cpu=5
+       rcu_sched kthread starved for 23807 jiffies! g7075 f0x0 RCU_GP_WAIT_FQS(3) ->state=0x1 ->cpu=5
+       Unless rcu_sched kthread gets sufficient CPU time, OOM is now expected behavior.
 
 Starving the grace-period kthreads of CPU time can of course result
 in RCU CPU stall warnings even when all CPUs and tasks have passed
@@ -313,15 +316,21 @@ is the current ``TIMER_SOFTIRQ`` count on cpu 4.  If this value does not
 change on successive RCU CPU stall warnings, there is further reason to
 suspect a timer problem.
 
+These messages are usually followed by stack dumps of the CPUs and tasks
+involved in the stall.  These stack traces can help you locate the cause
+of the stall, keeping in mind that the CPU detecting the stall will have
+an interrupt frame that is mainly devoted to detecting the stall.
+
 
 Multiple Warnings From One Stall
 ================================
 
-If a stall lasts long enough, multiple stall-warning messages will be
-printed for it.  The second and subsequent messages are printed at
+If a stall lasts long enough, multiple stall-warning messages will
+be printed for it.  The second and subsequent messages are printed at
 longer intervals, so that the time between (say) the first and second
 message will be about three times the interval between the beginning
-of the stall and the first message.
+of the stall and the first message.  It can be helpful to compare the
+stack dumps for the different messages for the same stalled grace period.
 
 
 Stall Warnings for Expedited Grace Periods
index f8633d3..d29740b 100644 (file)
 #include <linux/list.h>
 #include <linux/rcupdate.h>
 
-/*
- * Why is there no list_empty_rcu()?  Because list_empty() serves this
- * purpose.  The list_empty() function fetches the RCU-protected pointer
- * and compares it to the address of the list head, but neither dereferences
- * this pointer itself nor provides this pointer to the caller.  Therefore,
- * it is not necessary to use rcu_dereference(), so that list_empty() can
- * be used anywhere you would want to use a list_empty_rcu().
- */
-
 /*
  * INIT_LIST_HEAD_RCU - Initialize a list_head visible to RCU readers
  * @list: list to be initialized
@@ -318,21 +309,29 @@ static inline void list_splice_tail_init_rcu(struct list_head *list,
 /*
  * Where are list_empty_rcu() and list_first_entry_rcu()?
  *
- * Implementing those functions following their counterparts list_empty() and
- * list_first_entry() is not advisable because they lead to subtle race
- * conditions as the following snippet shows:
+ * They do not exist because they would lead to subtle race conditions:
  *
  * if (!list_empty_rcu(mylist)) {
  *     struct foo *bar = list_first_entry_rcu(mylist, struct foo, list_member);
  *     do_something(bar);
  * }
  *
- * The list may not be empty when list_empty_rcu checks it, but it may be when
- * list_first_entry_rcu rereads the ->next pointer.
- *
- * Rereading the ->next pointer is not a problem for list_empty() and
- * list_first_entry() because they would be protected by a lock that blocks
- * writers.
+ * The list might be non-empty when list_empty_rcu() checks it, but it
+ * might have become empty by the time that list_first_entry_rcu() rereads
+ * the ->next pointer, which would result in a SEGV.
+ *
+ * When not using RCU, it is OK for list_first_entry() to re-read that
+ * pointer because both functions should be protected by some lock that
+ * blocks writers.
+ *
+ * When using RCU, list_empty() uses READ_ONCE() to fetch the
+ * RCU-protected ->next pointer and then compares it to the address of the
+ * list head.  However, it neither dereferences this pointer nor provides
+ * this pointer to its caller.  Thus, READ_ONCE() suffices (that is,
+ * rcu_dereference() is not needed), which means that list_empty() can be
+ * used anywhere you would want to use list_empty_rcu().  Just don't
+ * expect anything useful to happen if you do a subsequent lockless
+ * call to list_first_entry_rcu()!!!
  *
  * See list_first_or_null_rcu for an alternative.
  */
index d9680b7..434d12f 100644 (file)
@@ -53,7 +53,7 @@ void __rcu_read_unlock(void);
  * nesting depth, but makes sense only if CONFIG_PREEMPT_RCU -- in other
  * types of kernel builds, the rcu_read_lock() nesting depth is unknowable.
  */
-#define rcu_preempt_depth() (current->rcu_read_lock_nesting)
+#define rcu_preempt_depth() READ_ONCE(current->rcu_read_lock_nesting)
 
 #else /* #ifdef CONFIG_PREEMPT_RCU */
 
@@ -167,7 +167,7 @@ void synchronize_rcu_tasks(void);
 # define synchronize_rcu_tasks synchronize_rcu
 # endif
 
-# ifdef CONFIG_TASKS_RCU_TRACE
+# ifdef CONFIG_TASKS_TRACE_RCU
 # define rcu_tasks_trace_qs(t)                                         \
        do {                                                            \
                if (!likely(READ_ONCE((t)->trc_reader_checked)) &&      \
index 953e70f..9be0153 100644 (file)
@@ -14,9 +14,6 @@
 
 #include <asm/param.h> /* for HZ */
 
-/* Never flag non-existent other CPUs! */
-static inline bool rcu_eqs_special_set(int cpu) { return false; }
-
 unsigned long get_state_synchronize_rcu(void);
 unsigned long start_poll_synchronize_rcu(void);
 bool poll_state_synchronize_rcu(unsigned long oldstate);
index 0e0cf4d..6cfaa0a 100644 (file)
@@ -61,7 +61,7 @@ static inline int __srcu_read_lock(struct srcu_struct *ssp)
        int idx;
 
        idx = ((READ_ONCE(ssp->srcu_idx) + 1) & 0x2) >> 1;
-       WRITE_ONCE(ssp->srcu_lock_nesting[idx], ssp->srcu_lock_nesting[idx] + 1);
+       WRITE_ONCE(ssp->srcu_lock_nesting[idx], READ_ONCE(ssp->srcu_lock_nesting[idx]) + 1);
        return idx;
 }
 
@@ -81,11 +81,11 @@ static inline void srcu_torture_stats_print(struct srcu_struct *ssp,
 {
        int idx;
 
-       idx = ((READ_ONCE(ssp->srcu_idx) + 1) & 0x2) >> 1;
+       idx = ((data_race(READ_ONCE(ssp->srcu_idx)) + 1) & 0x2) >> 1;
        pr_alert("%s%s Tiny SRCU per-CPU(idx=%d): (%hd,%hd)\n",
                 tt, tf, idx,
-                READ_ONCE(ssp->srcu_lock_nesting[!idx]),
-                READ_ONCE(ssp->srcu_lock_nesting[idx]));
+                data_race(READ_ONCE(ssp->srcu_lock_nesting[!idx])),
+                data_race(READ_ONCE(ssp->srcu_lock_nesting[idx])));
 }
 
 #endif
index b3adb40..7c5a4a0 100644 (file)
@@ -59,7 +59,7 @@ static struct task_struct **writer_tasks;
 static struct task_struct **reader_tasks;
 
 static bool lock_is_write_held;
-static bool lock_is_read_held;
+static atomic_t lock_is_read_held;
 static unsigned long last_lock_release;
 
 struct lock_stress_stats {
@@ -682,7 +682,7 @@ static int lock_torture_writer(void *arg)
                if (WARN_ON_ONCE(lock_is_write_held))
                        lwsp->n_lock_fail++;
                lock_is_write_held = true;
-               if (WARN_ON_ONCE(lock_is_read_held))
+               if (WARN_ON_ONCE(atomic_read(&lock_is_read_held)))
                        lwsp->n_lock_fail++; /* rare, but... */
 
                lwsp->n_lock_acquired++;
@@ -717,13 +717,13 @@ static int lock_torture_reader(void *arg)
                        schedule_timeout_uninterruptible(1);
 
                cxt.cur_ops->readlock(tid);
-               lock_is_read_held = true;
+               atomic_inc(&lock_is_read_held);
                if (WARN_ON_ONCE(lock_is_write_held))
                        lrsp->n_lock_fail++; /* rare, but... */
 
                lrsp->n_lock_acquired++;
                cxt.cur_ops->read_delay(&rand);
-               lock_is_read_held = false;
+               atomic_dec(&lock_is_read_held);
                cxt.cur_ops->readunlock(tid);
 
                stutter_wait("lock_torture_reader");
@@ -738,20 +738,22 @@ static int lock_torture_reader(void *arg)
 static void __torture_print_stats(char *page,
                                  struct lock_stress_stats *statp, bool write)
 {
+       long cur;
        bool fail = false;
        int i, n_stress;
-       long max = 0, min = statp ? statp[0].n_lock_acquired : 0;
+       long max = 0, min = statp ? data_race(statp[0].n_lock_acquired) : 0;
        long long sum = 0;
 
        n_stress = write ? cxt.nrealwriters_stress : cxt.nrealreaders_stress;
        for (i = 0; i < n_stress; i++) {
-               if (statp[i].n_lock_fail)
+               if (data_race(statp[i].n_lock_fail))
                        fail = true;
-               sum += statp[i].n_lock_acquired;
-               if (max < statp[i].n_lock_acquired)
-                       max = statp[i].n_lock_acquired;
-               if (min > statp[i].n_lock_acquired)
-                       min = statp[i].n_lock_acquired;
+               cur = data_race(statp[i].n_lock_acquired);
+               sum += cur;
+               if (max < cur)
+                       max = cur;
+               if (min > cur)
+                       min = cur;
        }
        page += sprintf(page,
                        "%s:  Total: %lld  Max/Min: %ld/%ld %s  Fail: %d %s\n",
@@ -996,7 +998,6 @@ static int __init lock_torture_init(void)
                }
 
                if (nreaders_stress) {
-                       lock_is_read_held = false;
                        cxt.lrsa = kmalloc_array(cxt.nrealreaders_stress,
                                                 sizeof(*cxt.lrsa),
                                                 GFP_KERNEL);
index dca51fe..2cc34a2 100644 (file)
@@ -487,7 +487,7 @@ retry:
        if (gp_async) {
                cur_ops->gp_barrier();
        }
-       writer_n_durations[me] = i_max;
+       writer_n_durations[me] = i_max + 1;
        torture_kthread_stopping("rcu_scale_writer");
        return 0;
 }
@@ -561,7 +561,7 @@ rcu_scale_cleanup(void)
                        wdpp = writer_durations[i];
                        if (!wdpp)
                                continue;
-                       for (j = 0; j <= writer_n_durations[i]; j++) {
+                       for (j = 0; j < writer_n_durations[i]; j++) {
                                wdp = &wdpp[j];
                                pr_alert("%s%s %4d writer-duration: %5d %llu\n",
                                        scale_type, SCALE_FLAG,
index 40ef541..ab42152 100644 (file)
@@ -2022,8 +2022,13 @@ static int rcu_torture_stall(void *args)
                          __func__, raw_smp_processor_id());
                while (ULONG_CMP_LT((unsigned long)ktime_get_seconds(),
                                    stop_at))
-                       if (stall_cpu_block)
+                       if (stall_cpu_block) {
+#ifdef CONFIG_PREEMPTION
+                               preempt_schedule();
+#else
                                schedule_timeout_uninterruptible(HZ);
+#endif
+                       }
                if (stall_cpu_irqsoff)
                        local_irq_enable();
                else if (!stall_cpu_block)
index d998a76..66dc14c 100644 (file)
@@ -467,6 +467,40 @@ static struct ref_scale_ops acqrel_ops = {
        .name           = "acqrel"
 };
 
+static volatile u64 stopopts;
+
+static void ref_clock_section(const int nloops)
+{
+       u64 x = 0;
+       int i;
+
+       preempt_disable();
+       for (i = nloops; i >= 0; i--)
+               x += ktime_get_real_fast_ns();
+       preempt_enable();
+       stopopts = x;
+}
+
+static void ref_clock_delay_section(const int nloops, const int udl, const int ndl)
+{
+       u64 x = 0;
+       int i;
+
+       preempt_disable();
+       for (i = nloops; i >= 0; i--) {
+               x += ktime_get_real_fast_ns();
+               un_delay(udl, ndl);
+       }
+       preempt_enable();
+       stopopts = x;
+}
+
+static struct ref_scale_ops clock_ops = {
+       .readsection    = ref_clock_section,
+       .delaysection   = ref_clock_delay_section,
+       .name           = "clock"
+};
+
 static void rcu_scale_one_reader(void)
 {
        if (readdelay <= 0)
@@ -759,7 +793,7 @@ ref_scale_init(void)
        int firsterr = 0;
        static struct ref_scale_ops *scale_ops[] = {
                &rcu_ops, &srcu_ops, &rcu_trace_ops, &rcu_tasks_ops, &refcnt_ops, &rwlock_ops,
-               &rwsem_ops, &lock_ops, &lock_irq_ops, &acqrel_ops,
+               &rwsem_ops, &lock_ops, &lock_irq_ops, &acqrel_ops, &clock_ops,
        };
 
        if (!torture_init_begin(scale_type, verbose))
index 26344dc..a0ba2ed 100644 (file)
@@ -96,7 +96,7 @@ EXPORT_SYMBOL_GPL(cleanup_srcu_struct);
  */
 void __srcu_read_unlock(struct srcu_struct *ssp, int idx)
 {
-       int newval = ssp->srcu_lock_nesting[idx] - 1;
+       int newval = READ_ONCE(ssp->srcu_lock_nesting[idx]) - 1;
 
        WRITE_ONCE(ssp->srcu_lock_nesting[idx], newval);
        if (!newval && READ_ONCE(ssp->srcu_gp_waiting))
index 8536c55..806160c 100644 (file)
@@ -643,8 +643,8 @@ void exit_tasks_rcu_finish(void) { exit_tasks_rcu_finish_trace(current); }
 //
 // "Rude" variant of Tasks RCU, inspired by Steve Rostedt's trick of
 // passing an empty function to schedule_on_each_cpu().  This approach
-// provides an asynchronous call_rcu_tasks_rude() API and batching
-// of concurrent calls to the synchronous synchronize_rcu_rude() API.
+// provides an asynchronous call_rcu_tasks_rude() API and batching of
+// concurrent calls to the synchronous synchronize_rcu_tasks_rude() API.
 // This invokes schedule_on_each_cpu() in order to send IPIs far and wide
 // and induces otherwise unnecessary context switches on all online CPUs,
 // whether idle or not.
@@ -785,7 +785,10 @@ EXPORT_SYMBOL_GPL(show_rcu_tasks_rude_gp_kthread);
 //     set that task's .need_qs flag so that task's next outermost
 //     rcu_read_unlock_trace() will report the quiescent state (in which
 //     case the count of readers is incremented).  If both attempts fail,
-//     the task is added to a "holdout" list.
+//     the task is added to a "holdout" list.  Note that IPIs are used
+//     to invoke trc_read_check_handler() in the context of running tasks
+//     in order to avoid ordering overhead on common-case shared-variable
+//     accessses.
 // rcu_tasks_trace_postscan():
 //     Initialize state and attempt to identify an immediate quiescent
 //     state as above (but only for idle tasks), unblock CPU-hotplug
@@ -847,7 +850,7 @@ static DEFINE_IRQ_WORK(rcu_tasks_trace_iw, rcu_read_unlock_iw);
 /* If we are the last reader, wake up the grace-period kthread. */
 void rcu_read_unlock_trace_special(struct task_struct *t, int nesting)
 {
-       int nq = t->trc_reader_special.b.need_qs;
+       int nq = READ_ONCE(t->trc_reader_special.b.need_qs);
 
        if (IS_ENABLED(CONFIG_TASKS_TRACE_RCU_READ_MB) &&
            t->trc_reader_special.b.need_mb)
@@ -894,7 +897,7 @@ static void trc_read_check_handler(void *t_in)
 
        // If the task is not in a read-side critical section, and
        // if this is the last reader, awaken the grace-period kthread.
-       if (likely(!t->trc_reader_nesting)) {
+       if (likely(!READ_ONCE(t->trc_reader_nesting))) {
                if (WARN_ON_ONCE(atomic_dec_and_test(&trc_n_readers_need_end)))
                        wake_up(&trc_wait);
                // Mark as checked after decrement to avoid false
@@ -903,7 +906,7 @@ static void trc_read_check_handler(void *t_in)
                goto reset_ipi;
        }
        // If we are racing with an rcu_read_unlock_trace(), try again later.
-       if (unlikely(t->trc_reader_nesting < 0)) {
+       if (unlikely(READ_ONCE(t->trc_reader_nesting) < 0)) {
                if (WARN_ON_ONCE(atomic_dec_and_test(&trc_n_readers_need_end)))
                        wake_up(&trc_wait);
                goto reset_ipi;
@@ -913,14 +916,14 @@ static void trc_read_check_handler(void *t_in)
        // Get here if the task is in a read-side critical section.  Set
        // its state so that it will awaken the grace-period kthread upon
        // exit from that critical section.
-       WARN_ON_ONCE(t->trc_reader_special.b.need_qs);
+       WARN_ON_ONCE(READ_ONCE(t->trc_reader_special.b.need_qs));
        WRITE_ONCE(t->trc_reader_special.b.need_qs, true);
 
 reset_ipi:
        // Allow future IPIs to be sent on CPU and for task.
        // Also order this IPI handler against any later manipulations of
        // the intended task.
-       smp_store_release(&per_cpu(trc_ipi_to_cpu, smp_processor_id()), false); // ^^^
+       smp_store_release(per_cpu_ptr(&trc_ipi_to_cpu, smp_processor_id()), false); // ^^^
        smp_store_release(&texp->trc_ipi_to_cpu, -1); // ^^^
 }
 
@@ -950,6 +953,7 @@ static bool trc_inspect_reader(struct task_struct *t, void *arg)
                        n_heavy_reader_ofl_updates++;
                in_qs = true;
        } else {
+               // The task is not running, so C-language access is safe.
                in_qs = likely(!t->trc_reader_nesting);
        }
 
@@ -964,7 +968,7 @@ static bool trc_inspect_reader(struct task_struct *t, void *arg)
        // state so that it will awaken the grace-period kthread upon exit
        // from that critical section.
        atomic_inc(&trc_n_readers_need_end); // One more to wait on.
-       WARN_ON_ONCE(t->trc_reader_special.b.need_qs);
+       WARN_ON_ONCE(READ_ONCE(t->trc_reader_special.b.need_qs));
        WRITE_ONCE(t->trc_reader_special.b.need_qs, true);
        return true;
 }
@@ -982,7 +986,7 @@ static void trc_wait_for_one_reader(struct task_struct *t,
        // The current task had better be in a quiescent state.
        if (t == current) {
                t->trc_reader_checked = true;
-               WARN_ON_ONCE(t->trc_reader_nesting);
+               WARN_ON_ONCE(READ_ONCE(t->trc_reader_nesting));
                return;
        }
 
@@ -994,6 +998,12 @@ static void trc_wait_for_one_reader(struct task_struct *t,
        }
        put_task_struct(t);
 
+       // If this task is not yet on the holdout list, then we are in
+       // an RCU read-side critical section.  Otherwise, the invocation of
+       // rcu_add_holdout() that added it to the list did the necessary
+       // get_task_struct().  Either way, the task cannot be freed out
+       // from under this code.
+
        // If currently running, send an IPI, either way, add to list.
        trc_add_holdout(t, bhp);
        if (task_curr(t) &&
@@ -1092,8 +1102,8 @@ static void show_stalled_task_trace(struct task_struct *t, bool *firstreport)
                 ".I"[READ_ONCE(t->trc_ipi_to_cpu) > 0],
                 ".i"[is_idle_task(t)],
                 ".N"[cpu > 0 && tick_nohz_full_cpu(cpu)],
-                t->trc_reader_nesting,
-                " N"[!!t->trc_reader_special.b.need_qs],
+                READ_ONCE(t->trc_reader_nesting),
+                " N"[!!READ_ONCE(t->trc_reader_special.b.need_qs)],
                 cpu);
        sched_show_task(t);
 }
@@ -1187,7 +1197,7 @@ static void rcu_tasks_trace_postgp(struct rcu_tasks *rtp)
 static void exit_tasks_rcu_finish_trace(struct task_struct *t)
 {
        WRITE_ONCE(t->trc_reader_checked, true);
-       WARN_ON_ONCE(t->trc_reader_nesting);
+       WARN_ON_ONCE(READ_ONCE(t->trc_reader_nesting));
        WRITE_ONCE(t->trc_reader_nesting, 0);
        if (WARN_ON_ONCE(READ_ONCE(t->trc_reader_special.b.need_qs)))
                rcu_read_unlock_trace_special(t, 0);
index 51f24ec..bce848e 100644 (file)
 
 /* Data structures. */
 
-/*
- * Steal a bit from the bottom of ->dynticks for idle entry/exit
- * control.  Initially this is for TLB flushing.
- */
-#define RCU_DYNTICK_CTRL_MASK 0x1
-#define RCU_DYNTICK_CTRL_CTR  (RCU_DYNTICK_CTRL_MASK + 1)
-
 static DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_data, rcu_data) = {
        .dynticks_nesting = 1,
        .dynticks_nmi_nesting = DYNTICK_IRQ_NONIDLE,
-       .dynticks = ATOMIC_INIT(RCU_DYNTICK_CTRL_CTR),
+       .dynticks = ATOMIC_INIT(1),
 #ifdef CONFIG_RCU_NOCB_CPU
        .cblist.flags = SEGCBLIST_SOFTIRQ_ONLY,
 #endif
@@ -258,6 +251,15 @@ void rcu_softirq_qs(void)
        rcu_tasks_qs(current, false);
 }
 
+/*
+ * Increment the current CPU's rcu_data structure's ->dynticks field
+ * with ordering.  Return the new value.
+ */
+static noinline noinstr unsigned long rcu_dynticks_inc(int incby)
+{
+       return arch_atomic_add_return(incby, this_cpu_ptr(&rcu_data.dynticks));
+}
+
 /*
  * Record entry into an extended quiescent state.  This is only to be
  * called when not already in an extended quiescent state, that is,
@@ -266,7 +268,6 @@ void rcu_softirq_qs(void)
  */
 static noinstr void rcu_dynticks_eqs_enter(void)
 {
-       struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
        int seq;
 
        /*
@@ -275,13 +276,9 @@ static noinstr void rcu_dynticks_eqs_enter(void)
         * next idle sojourn.
         */
        rcu_dynticks_task_trace_enter();  // Before ->dynticks update!
-       seq = arch_atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+       seq = rcu_dynticks_inc(1);
        // RCU is no longer watching.  Better be in extended quiescent state!
-       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
-                    (seq & RCU_DYNTICK_CTRL_CTR));
-       /* Better not have special action (TLB flush) pending! */
-       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
-                    (seq & RCU_DYNTICK_CTRL_MASK));
+       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && (seq & 0x1));
 }
 
 /*
@@ -291,7 +288,6 @@ static noinstr void rcu_dynticks_eqs_enter(void)
  */
 static noinstr void rcu_dynticks_eqs_exit(void)
 {
-       struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
        int seq;
 
        /*
@@ -299,15 +295,10 @@ static noinstr void rcu_dynticks_eqs_exit(void)
         * and we also must force ordering with the next RCU read-side
         * critical section.
         */
-       seq = arch_atomic_add_return(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+       seq = rcu_dynticks_inc(1);
        // RCU is now watching.  Better not be in an extended quiescent state!
        rcu_dynticks_task_trace_exit();  // After ->dynticks update!
-       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) &&
-                    !(seq & RCU_DYNTICK_CTRL_CTR));
-       if (seq & RCU_DYNTICK_CTRL_MASK) {
-               arch_atomic_andnot(RCU_DYNTICK_CTRL_MASK, &rdp->dynticks);
-               smp_mb__after_atomic(); /* _exit after clearing mask. */
-       }
+       WARN_ON_ONCE(IS_ENABLED(CONFIG_RCU_EQS_DEBUG) && !(seq & 0x1));
 }
 
 /*
@@ -324,9 +315,9 @@ static void rcu_dynticks_eqs_online(void)
 {
        struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 
-       if (atomic_read(&rdp->dynticks) & RCU_DYNTICK_CTRL_CTR)
+       if (atomic_read(&rdp->dynticks) & 0x1)
                return;
-       atomic_add(RCU_DYNTICK_CTRL_CTR, &rdp->dynticks);
+       rcu_dynticks_inc(1);
 }
 
 /*
@@ -336,9 +327,7 @@ static void rcu_dynticks_eqs_online(void)
  */
 static __always_inline bool rcu_dynticks_curr_cpu_in_eqs(void)
 {
-       struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
-
-       return !(arch_atomic_read(&rdp->dynticks) & RCU_DYNTICK_CTRL_CTR);
+       return !(atomic_read(this_cpu_ptr(&rcu_data.dynticks)) & 0x1);
 }
 
 /*
@@ -347,9 +336,8 @@ static __always_inline bool rcu_dynticks_curr_cpu_in_eqs(void)
  */
 static int rcu_dynticks_snap(struct rcu_data *rdp)
 {
-       int snap = atomic_add_return(0, &rdp->dynticks);
-
-       return snap & ~RCU_DYNTICK_CTRL_MASK;
+       smp_mb();  // Fundamental RCU ordering guarantee.
+       return atomic_read_acquire(&rdp->dynticks);
 }
 
 /*
@@ -358,7 +346,7 @@ static int rcu_dynticks_snap(struct rcu_data *rdp)
  */
 static bool rcu_dynticks_in_eqs(int snap)
 {
-       return !(snap & RCU_DYNTICK_CTRL_CTR);
+       return !(snap & 0x1);
 }
 
 /* Return true if the specified CPU is currently idle from an RCU viewpoint.  */
@@ -389,8 +377,7 @@ bool rcu_dynticks_zero_in_eqs(int cpu, int *vp)
        int snap;
 
        // If not quiescent, force back to earlier extended quiescent state.
-       snap = atomic_read(&rdp->dynticks) & ~(RCU_DYNTICK_CTRL_MASK |
-                                              RCU_DYNTICK_CTRL_CTR);
+       snap = atomic_read(&rdp->dynticks) & ~0x1;
 
        smp_rmb(); // Order ->dynticks and *vp reads.
        if (READ_ONCE(*vp))
@@ -398,32 +385,7 @@ bool rcu_dynticks_zero_in_eqs(int cpu, int *vp)
        smp_rmb(); // Order *vp read and ->dynticks re-read.
 
        // If still in the same extended quiescent state, we are good!
-       return snap == (atomic_read(&rdp->dynticks) & ~RCU_DYNTICK_CTRL_MASK);
-}
-
-/*
- * Set the special (bottom) bit of the specified CPU so that it
- * will take special action (such as flushing its TLB) on the
- * next exit from an extended quiescent state.  Returns true if
- * the bit was successfully set, or false if the CPU was not in
- * an extended quiescent state.
- */
-bool rcu_eqs_special_set(int cpu)
-{
-       int old;
-       int new;
-       int new_old;
-       struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
-
-       new_old = atomic_read(&rdp->dynticks);
-       do {
-               old = new_old;
-               if (old & RCU_DYNTICK_CTRL_CTR)
-                       return false;
-               new = old | RCU_DYNTICK_CTRL_MASK;
-               new_old = atomic_cmpxchg(&rdp->dynticks, old, new);
-       } while (new_old != old);
-       return true;
+       return snap == atomic_read(&rdp->dynticks);
 }
 
 /*
@@ -439,13 +401,12 @@ bool rcu_eqs_special_set(int cpu)
  */
 notrace void rcu_momentary_dyntick_idle(void)
 {
-       int special;
+       int seq;
 
        raw_cpu_write(rcu_data.rcu_need_heavy_qs, false);
-       special = atomic_add_return(2 * RCU_DYNTICK_CTRL_CTR,
-                                   &this_cpu_ptr(&rcu_data)->dynticks);
+       seq = rcu_dynticks_inc(2);
        /* It is illegal to call this from idle state. */
-       WARN_ON_ONCE(!(special & RCU_DYNTICK_CTRL_CTR));
+       WARN_ON_ONCE(!(seq & 0x1));
        rcu_preempt_deferred_qs(current);
 }
 EXPORT_SYMBOL_GPL(rcu_momentary_dyntick_idle);
@@ -1325,7 +1286,7 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp)
         */
        jtsq = READ_ONCE(jiffies_to_sched_qs);
        ruqp = per_cpu_ptr(&rcu_data.rcu_urgent_qs, rdp->cpu);
-       rnhqp = &per_cpu(rcu_data.rcu_need_heavy_qs, rdp->cpu);
+       rnhqp = per_cpu_ptr(&rcu_data.rcu_need_heavy_qs, rdp->cpu);
        if (!READ_ONCE(*rnhqp) &&
            (time_after(jiffies, rcu_state.gp_start + jtsq * 2) ||
             time_after(jiffies, rcu_state.jiffies_resched) ||
@@ -1772,7 +1733,7 @@ static void rcu_strict_gp_boundary(void *unused)
 /*
  * Initialize a new grace period.  Return false if no grace period required.
  */
-static bool rcu_gp_init(void)
+static noinline_for_stack bool rcu_gp_init(void)
 {
        unsigned long firstseq;
        unsigned long flags;
@@ -1966,7 +1927,7 @@ static void rcu_gp_fqs(bool first_time)
 /*
  * Loop doing repeated quiescent-state forcing until the grace period ends.
  */
-static void rcu_gp_fqs_loop(void)
+static noinline_for_stack void rcu_gp_fqs_loop(void)
 {
        bool first_gp_fqs;
        int gf = 0;
@@ -1993,8 +1954,8 @@ static void rcu_gp_fqs_loop(void)
                trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
                                       TPS("fqswait"));
                WRITE_ONCE(rcu_state.gp_state, RCU_GP_WAIT_FQS);
-               ret = swait_event_idle_timeout_exclusive(
-                               rcu_state.gp_wq, rcu_gp_fqs_check_wake(&gf), j);
+               (void)swait_event_idle_timeout_exclusive(rcu_state.gp_wq,
+                                rcu_gp_fqs_check_wake(&gf), j);
                rcu_gp_torture_wait();
                WRITE_ONCE(rcu_state.gp_state, RCU_GP_DOING_FQS);
                /* Locking provides needed memory barriers. */
@@ -2471,9 +2432,6 @@ int rcutree_dead_cpu(unsigned int cpu)
        WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus - 1);
        /* Adjust any no-longer-needed kthreads. */
        rcu_boost_kthread_setaffinity(rnp, -1);
-       /* Do any needed no-CB deferred wakeups from this CPU. */
-       do_nocb_deferred_wakeup(per_cpu_ptr(&rcu_data, cpu));
-
        // Stop-machine done, so allow nohz_full to disable tick.
        tick_dep_clear(TICK_DEP_BIT_RCU);
        return 0;
@@ -4050,7 +4008,7 @@ void rcu_barrier(void)
         */
        init_completion(&rcu_state.barrier_completion);
        atomic_set(&rcu_state.barrier_cpu_count, 2);
-       get_online_cpus();
+       cpus_read_lock();
 
        /*
         * Force each CPU with callbacks to register a new callback.
@@ -4081,7 +4039,7 @@ void rcu_barrier(void)
                                          rcu_state.barrier_sequence);
                }
        }
-       put_online_cpus();
+       cpus_read_unlock();
 
        /*
         * Now that we have an rcu_barrier_callback() callback on each
@@ -4784,4 +4742,5 @@ void __init rcu_init(void)
 
 #include "tree_stall.h"
 #include "tree_exp.h"
+#include "tree_nocb.h"
 #include "tree_plugin.h"
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
new file mode 100644 (file)
index 0000000..8fdf44f
--- /dev/null
@@ -0,0 +1,1496 @@
+/* SPDX-License-Identifier: GPL-2.0+ */
+/*
+ * Read-Copy Update mechanism for mutual exclusion (tree-based version)
+ * Internal non-public definitions that provide either classic
+ * or preemptible semantics.
+ *
+ * Copyright Red Hat, 2009
+ * Copyright IBM Corporation, 2009
+ * Copyright SUSE, 2021
+ *
+ * Author: Ingo Molnar <mingo@elte.hu>
+ *        Paul E. McKenney <paulmck@linux.ibm.com>
+ *        Frederic Weisbecker <frederic@kernel.org>
+ */
+
+#ifdef CONFIG_RCU_NOCB_CPU
+static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
+static bool __read_mostly rcu_nocb_poll;    /* Offload kthread are to poll. */
+static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
+{
+       return lockdep_is_held(&rdp->nocb_lock);
+}
+
+static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
+{
+       /* Race on early boot between thread creation and assignment */
+       if (!rdp->nocb_cb_kthread || !rdp->nocb_gp_kthread)
+               return true;
+
+       if (current == rdp->nocb_cb_kthread || current == rdp->nocb_gp_kthread)
+               if (in_task())
+                       return true;
+       return false;
+}
+
+/*
+ * Offload callback processing from the boot-time-specified set of CPUs
+ * specified by rcu_nocb_mask.  For the CPUs in the set, there are kthreads
+ * created that pull the callbacks from the corresponding CPU, wait for
+ * a grace period to elapse, and invoke the callbacks.  These kthreads
+ * are organized into GP kthreads, which manage incoming callbacks, wait for
+ * grace periods, and awaken CB kthreads, and the CB kthreads, which only
+ * invoke callbacks.  Each GP kthread invokes its own CBs.  The no-CBs CPUs
+ * do a wake_up() on their GP kthread when they insert a callback into any
+ * empty list, unless the rcu_nocb_poll boot parameter has been specified,
+ * in which case each kthread actively polls its CPU.  (Which isn't so great
+ * for energy efficiency, but which does reduce RCU's overhead on that CPU.)
+ *
+ * This is intended to be used in conjunction with Frederic Weisbecker's
+ * adaptive-idle work, which would seriously reduce OS jitter on CPUs
+ * running CPU-bound user-mode computations.
+ *
+ * Offloading of callbacks can also be used as an energy-efficiency
+ * measure because CPUs with no RCU callbacks queued are more aggressive
+ * about entering dyntick-idle mode.
+ */
+
+
+/*
+ * Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters.
+ * If the list is invalid, a warning is emitted and all CPUs are offloaded.
+ */
+static int __init rcu_nocb_setup(char *str)
+{
+       alloc_bootmem_cpumask_var(&rcu_nocb_mask);
+       if (cpulist_parse(str, rcu_nocb_mask)) {
+               pr_warn("rcu_nocbs= bad CPU range, all CPUs set\n");
+               cpumask_setall(rcu_nocb_mask);
+       }
+       return 1;
+}
+__setup("rcu_nocbs=", rcu_nocb_setup);
+
+static int __init parse_rcu_nocb_poll(char *arg)
+{
+       rcu_nocb_poll = true;
+       return 0;
+}
+early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
+
+/*
+ * Don't bother bypassing ->cblist if the call_rcu() rate is low.
+ * After all, the main point of bypassing is to avoid lock contention
+ * on ->nocb_lock, which only can happen at high call_rcu() rates.
+ */
+static int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ;
+module_param(nocb_nobypass_lim_per_jiffy, int, 0);
+
+/*
+ * Acquire the specified rcu_data structure's ->nocb_bypass_lock.  If the
+ * lock isn't immediately available, increment ->nocb_lock_contended to
+ * flag the contention.
+ */
+static void rcu_nocb_bypass_lock(struct rcu_data *rdp)
+       __acquires(&rdp->nocb_bypass_lock)
+{
+       lockdep_assert_irqs_disabled();
+       if (raw_spin_trylock(&rdp->nocb_bypass_lock))
+               return;
+       atomic_inc(&rdp->nocb_lock_contended);
+       WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
+       smp_mb__after_atomic(); /* atomic_inc() before lock. */
+       raw_spin_lock(&rdp->nocb_bypass_lock);
+       smp_mb__before_atomic(); /* atomic_dec() after lock. */
+       atomic_dec(&rdp->nocb_lock_contended);
+}
+
+/*
+ * Spinwait until the specified rcu_data structure's ->nocb_lock is
+ * not contended.  Please note that this is extremely special-purpose,
+ * relying on the fact that at most two kthreads and one CPU contend for
+ * this lock, and also that the two kthreads are guaranteed to have frequent
+ * grace-period-duration time intervals between successive acquisitions
+ * of the lock.  This allows us to use an extremely simple throttling
+ * mechanism, and further to apply it only to the CPU doing floods of
+ * call_rcu() invocations.  Don't try this at home!
+ */
+static void rcu_nocb_wait_contended(struct rcu_data *rdp)
+{
+       WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
+       while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended)))
+               cpu_relax();
+}
+
+/*
+ * Conditionally acquire the specified rcu_data structure's
+ * ->nocb_bypass_lock.
+ */
+static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp)
+{
+       lockdep_assert_irqs_disabled();
+       return raw_spin_trylock(&rdp->nocb_bypass_lock);
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_bypass_lock.
+ */
+static void rcu_nocb_bypass_unlock(struct rcu_data *rdp)
+       __releases(&rdp->nocb_bypass_lock)
+{
+       lockdep_assert_irqs_disabled();
+       raw_spin_unlock(&rdp->nocb_bypass_lock);
+}
+
+/*
+ * Acquire the specified rcu_data structure's ->nocb_lock, but only
+ * if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_lock(struct rcu_data *rdp)
+{
+       lockdep_assert_irqs_disabled();
+       if (!rcu_rdp_is_offloaded(rdp))
+               return;
+       raw_spin_lock(&rdp->nocb_lock);
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_lock, but only
+ * if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_unlock(struct rcu_data *rdp)
+{
+       if (rcu_rdp_is_offloaded(rdp)) {
+               lockdep_assert_irqs_disabled();
+               raw_spin_unlock(&rdp->nocb_lock);
+       }
+}
+
+/*
+ * Release the specified rcu_data structure's ->nocb_lock and restore
+ * interrupts, but only if it corresponds to a no-CBs CPU.
+ */
+static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
+                                      unsigned long flags)
+{
+       if (rcu_rdp_is_offloaded(rdp)) {
+               lockdep_assert_irqs_disabled();
+               raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+       } else {
+               local_irq_restore(flags);
+       }
+}
+
+/* Lockdep check that ->cblist may be safely accessed. */
+static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
+{
+       lockdep_assert_irqs_disabled();
+       if (rcu_rdp_is_offloaded(rdp))
+               lockdep_assert_held(&rdp->nocb_lock);
+}
+
+/*
+ * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
+ * grace period.
+ */
+static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
+{
+       swake_up_all(sq);
+}
+
+static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
+{
+       return &rnp->nocb_gp_wq[rcu_seq_ctr(rnp->gp_seq) & 0x1];
+}
+
+static void rcu_init_one_nocb(struct rcu_node *rnp)
+{
+       init_swait_queue_head(&rnp->nocb_gp_wq[0]);
+       init_swait_queue_head(&rnp->nocb_gp_wq[1]);
+}
+
+/* Is the specified CPU a no-CBs CPU? */
+bool rcu_is_nocb_cpu(int cpu)
+{
+       if (cpumask_available(rcu_nocb_mask))
+               return cpumask_test_cpu(cpu, rcu_nocb_mask);
+       return false;
+}
+
+static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
+                          struct rcu_data *rdp,
+                          bool force, unsigned long flags)
+       __releases(rdp_gp->nocb_gp_lock)
+{
+       bool needwake = false;
+
+       if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
+               raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+                                   TPS("AlreadyAwake"));
+               return false;
+       }
+
+       if (rdp_gp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
+               WRITE_ONCE(rdp_gp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+               del_timer(&rdp_gp->nocb_timer);
+       }
+
+       if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
+               WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
+               needwake = true;
+       }
+       raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+       if (needwake) {
+               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
+               wake_up_process(rdp_gp->nocb_gp_kthread);
+       }
+
+       return needwake;
+}
+
+/*
+ * Kick the GP kthread for this NOCB group.
+ */
+static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
+{
+       unsigned long flags;
+       struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+       raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+       return __wake_nocb_gp(rdp_gp, rdp, force, flags);
+}
+
+/*
+ * Arrange to wake the GP kthread for this NOCB group at some future
+ * time when it is safe to do so.
+ */
+static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
+                              const char *reason)
+{
+       unsigned long flags;
+       struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+       raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+
+       /*
+        * Bypass wakeup overrides previous deferments. In case
+        * of callback storm, no need to wake up too early.
+        */
+       if (waketype == RCU_NOCB_WAKE_BYPASS) {
+               mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
+               WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+       } else {
+               if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE)
+                       mod_timer(&rdp_gp->nocb_timer, jiffies + 1);
+               if (rdp_gp->nocb_defer_wakeup < waketype)
+                       WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+       }
+
+       raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+
+       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
+}
+
+/*
+ * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
+ * However, if there is a callback to be enqueued and if ->nocb_bypass
+ * proves to be initially empty, just return false because the no-CB GP
+ * kthread may need to be awakened in this case.
+ *
+ * Note that this function always returns true if rhp is NULL.
+ */
+static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+                                    unsigned long j)
+{
+       struct rcu_cblist rcl;
+
+       WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
+       rcu_lockdep_assert_cblist_protected(rdp);
+       lockdep_assert_held(&rdp->nocb_bypass_lock);
+       if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
+               raw_spin_unlock(&rdp->nocb_bypass_lock);
+               return false;
+       }
+       /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
+       if (rhp)
+               rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
+       rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
+       rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
+       WRITE_ONCE(rdp->nocb_bypass_first, j);
+       rcu_nocb_bypass_unlock(rdp);
+       return true;
+}
+
+/*
+ * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
+ * However, if there is a callback to be enqueued and if ->nocb_bypass
+ * proves to be initially empty, just return false because the no-CB GP
+ * kthread may need to be awakened in this case.
+ *
+ * Note that this function always returns true if rhp is NULL.
+ */
+static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+                                 unsigned long j)
+{
+       if (!rcu_rdp_is_offloaded(rdp))
+               return true;
+       rcu_lockdep_assert_cblist_protected(rdp);
+       rcu_nocb_bypass_lock(rdp);
+       return rcu_nocb_do_flush_bypass(rdp, rhp, j);
+}
+
+/*
+ * If the ->nocb_bypass_lock is immediately available, flush the
+ * ->nocb_bypass queue into ->cblist.
+ */
+static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
+{
+       rcu_lockdep_assert_cblist_protected(rdp);
+       if (!rcu_rdp_is_offloaded(rdp) ||
+           !rcu_nocb_bypass_trylock(rdp))
+               return;
+       WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
+}
+
+/*
+ * See whether it is appropriate to use the ->nocb_bypass list in order
+ * to control contention on ->nocb_lock.  A limited number of direct
+ * enqueues are permitted into ->cblist per jiffy.  If ->nocb_bypass
+ * is non-empty, further callbacks must be placed into ->nocb_bypass,
+ * otherwise rcu_barrier() breaks.  Use rcu_nocb_flush_bypass() to switch
+ * back to direct use of ->cblist.  However, ->nocb_bypass should not be
+ * used if ->cblist is empty, because otherwise callbacks can be stranded
+ * on ->nocb_bypass because we cannot count on the current CPU ever again
+ * invoking call_rcu().  The general rule is that if ->nocb_bypass is
+ * non-empty, the corresponding no-CBs grace-period kthread must not be
+ * in an indefinite sleep state.
+ *
+ * Finally, it is not permitted to use the bypass during early boot,
+ * as doing so would confuse the auto-initialization code.  Besides
+ * which, there is no point in worrying about lock contention while
+ * there is only one CPU in operation.
+ */
+static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+                               bool *was_alldone, unsigned long flags)
+{
+       unsigned long c;
+       unsigned long cur_gp_seq;
+       unsigned long j = jiffies;
+       long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+
+       lockdep_assert_irqs_disabled();
+
+       // Pure softirq/rcuc based processing: no bypassing, no
+       // locking.
+       if (!rcu_rdp_is_offloaded(rdp)) {
+               *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+               return false;
+       }
+
+       // In the process of (de-)offloading: no bypassing, but
+       // locking.
+       if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) {
+               rcu_nocb_lock(rdp);
+               *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+               return false; /* Not offloaded, no bypassing. */
+       }
+
+       // Don't use ->nocb_bypass during early boot.
+       if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) {
+               rcu_nocb_lock(rdp);
+               WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+               *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+               return false;
+       }
+
+       // If we have advanced to a new jiffy, reset counts to allow
+       // moving back from ->nocb_bypass to ->cblist.
+       if (j == rdp->nocb_nobypass_last) {
+               c = rdp->nocb_nobypass_count + 1;
+       } else {
+               WRITE_ONCE(rdp->nocb_nobypass_last, j);
+               c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy;
+               if (ULONG_CMP_LT(rdp->nocb_nobypass_count,
+                                nocb_nobypass_lim_per_jiffy))
+                       c = 0;
+               else if (c > nocb_nobypass_lim_per_jiffy)
+                       c = nocb_nobypass_lim_per_jiffy;
+       }
+       WRITE_ONCE(rdp->nocb_nobypass_count, c);
+
+       // If there hasn't yet been all that many ->cblist enqueues
+       // this jiffy, tell the caller to enqueue onto ->cblist.  But flush
+       // ->nocb_bypass first.
+       if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
+               rcu_nocb_lock(rdp);
+               *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+               if (*was_alldone)
+                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+                                           TPS("FirstQ"));
+               WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
+               WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+               return false; // Caller must enqueue the callback.
+       }
+
+       // If ->nocb_bypass has been used too long or is too full,
+       // flush ->nocb_bypass to ->cblist.
+       if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
+           ncbs >= qhimark) {
+               rcu_nocb_lock(rdp);
+               if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
+                       *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
+                       if (*was_alldone)
+                               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+                                                   TPS("FirstQ"));
+                       WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+                       return false; // Caller must enqueue the callback.
+               }
+               if (j != rdp->nocb_gp_adv_time &&
+                   rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+                   rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
+                       rcu_advance_cbs_nowake(rdp->mynode, rdp);
+                       rdp->nocb_gp_adv_time = j;
+               }
+               rcu_nocb_unlock_irqrestore(rdp, flags);
+               return true; // Callback already enqueued.
+       }
+
+       // We need to use the bypass.
+       rcu_nocb_wait_contended(rdp);
+       rcu_nocb_bypass_lock(rdp);
+       ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+       rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
+       rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
+       if (!ncbs) {
+               WRITE_ONCE(rdp->nocb_bypass_first, j);
+               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
+       }
+       rcu_nocb_bypass_unlock(rdp);
+       smp_mb(); /* Order enqueue before wake. */
+       if (ncbs) {
+               local_irq_restore(flags);
+       } else {
+               // No-CBs GP kthread might be indefinitely asleep, if so, wake.
+               rcu_nocb_lock(rdp); // Rare during call_rcu() flood.
+               if (!rcu_segcblist_pend_cbs(&rdp->cblist)) {
+                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+                                           TPS("FirstBQwake"));
+                       __call_rcu_nocb_wake(rdp, true, flags);
+               } else {
+                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+                                           TPS("FirstBQnoWake"));
+                       rcu_nocb_unlock_irqrestore(rdp, flags);
+               }
+       }
+       return true; // Callback already enqueued.
+}
+
+/*
+ * Awaken the no-CBs grace-period kthread if needed, either due to it
+ * legitimately being asleep or due to overload conditions.
+ *
+ * If warranted, also wake up the kthread servicing this CPUs queues.
+ */
+static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
+                                unsigned long flags)
+                                __releases(rdp->nocb_lock)
+{
+       unsigned long cur_gp_seq;
+       unsigned long j;
+       long len;
+       struct task_struct *t;
+
+       // If we are being polled or there is no kthread, just leave.
+       t = READ_ONCE(rdp->nocb_gp_kthread);
+       if (rcu_nocb_poll || !t) {
+               rcu_nocb_unlock_irqrestore(rdp, flags);
+               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+                                   TPS("WakeNotPoll"));
+               return;
+       }
+       // Need to actually to a wakeup.
+       len = rcu_segcblist_n_cbs(&rdp->cblist);
+       if (was_alldone) {
+               rdp->qlen_last_fqs_check = len;
+               if (!irqs_disabled_flags(flags)) {
+                       /* ... if queue was empty ... */
+                       rcu_nocb_unlock_irqrestore(rdp, flags);
+                       wake_nocb_gp(rdp, false);
+                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+                                           TPS("WakeEmpty"));
+               } else {
+                       rcu_nocb_unlock_irqrestore(rdp, flags);
+                       wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
+                                          TPS("WakeEmptyIsDeferred"));
+               }
+       } else if (len > rdp->qlen_last_fqs_check + qhimark) {
+               /* ... or if many callbacks queued. */
+               rdp->qlen_last_fqs_check = len;
+               j = jiffies;
+               if (j != rdp->nocb_gp_adv_time &&
+                   rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+                   rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
+                       rcu_advance_cbs_nowake(rdp->mynode, rdp);
+                       rdp->nocb_gp_adv_time = j;
+               }
+               smp_mb(); /* Enqueue before timer_pending(). */
+               if ((rdp->nocb_cb_sleep ||
+                    !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
+                   !timer_pending(&rdp->nocb_timer)) {
+                       rcu_nocb_unlock_irqrestore(rdp, flags);
+                       wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
+                                          TPS("WakeOvfIsDeferred"));
+               } else {
+                       rcu_nocb_unlock_irqrestore(rdp, flags);
+                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
+               }
+       } else {
+               rcu_nocb_unlock_irqrestore(rdp, flags);
+               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
+       }
+       return;
+}
+
+/*
+ * Check if we ignore this rdp.
+ *
+ * We check that without holding the nocb lock but
+ * we make sure not to miss a freshly offloaded rdp
+ * with the current ordering:
+ *
+ *  rdp_offload_toggle()        nocb_gp_enabled_cb()
+ * -------------------------   ----------------------------
+ *    WRITE flags                 LOCK nocb_gp_lock
+ *    LOCK nocb_gp_lock           READ/WRITE nocb_gp_sleep
+ *    READ/WRITE nocb_gp_sleep    UNLOCK nocb_gp_lock
+ *    UNLOCK nocb_gp_lock         READ flags
+ */
+static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp)
+{
+       u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP;
+
+       return rcu_segcblist_test_flags(&rdp->cblist, flags);
+}
+
+static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp,
+                                                    bool *needwake_state)
+{
+       struct rcu_segcblist *cblist = &rdp->cblist;
+
+       if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
+               if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
+                       rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
+                       if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
+                               *needwake_state = true;
+               }
+               return false;
+       }
+
+       /*
+        * De-offloading. Clear our flag and notify the de-offload worker.
+        * We will ignore this rdp until it ever gets re-offloaded.
+        */
+       WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
+       rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
+       if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
+               *needwake_state = true;
+       return true;
+}
+
+
+/*
+ * No-CBs GP kthreads come here to wait for additional callbacks to show up
+ * or for grace periods to end.
+ */
+static void nocb_gp_wait(struct rcu_data *my_rdp)
+{
+       bool bypass = false;
+       long bypass_ncbs;
+       int __maybe_unused cpu = my_rdp->cpu;
+       unsigned long cur_gp_seq;
+       unsigned long flags;
+       bool gotcbs = false;
+       unsigned long j = jiffies;
+       bool needwait_gp = false; // This prevents actual uninitialized use.
+       bool needwake;
+       bool needwake_gp;
+       struct rcu_data *rdp;
+       struct rcu_node *rnp;
+       unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.
+       bool wasempty = false;
+
+       /*
+        * Each pass through the following loop checks for CBs and for the
+        * nearest grace period (if any) to wait for next.  The CB kthreads
+        * and the global grace-period kthread are awakened if needed.
+        */
+       WARN_ON_ONCE(my_rdp->nocb_gp_rdp != my_rdp);
+       for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
+               bool needwake_state = false;
+
+               if (!nocb_gp_enabled_cb(rdp))
+                       continue;
+               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
+               rcu_nocb_lock_irqsave(rdp, flags);
+               if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) {
+                       rcu_nocb_unlock_irqrestore(rdp, flags);
+                       if (needwake_state)
+                               swake_up_one(&rdp->nocb_state_wq);
+                       continue;
+               }
+               bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+               if (bypass_ncbs &&
+                   (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
+                    bypass_ncbs > 2 * qhimark)) {
+                       // Bypass full or old, so flush it.
+                       (void)rcu_nocb_try_flush_bypass(rdp, j);
+                       bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
+               } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
+                       rcu_nocb_unlock_irqrestore(rdp, flags);
+                       if (needwake_state)
+                               swake_up_one(&rdp->nocb_state_wq);
+                       continue; /* No callbacks here, try next. */
+               }
+               if (bypass_ncbs) {
+                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+                                           TPS("Bypass"));
+                       bypass = true;
+               }
+               rnp = rdp->mynode;
+
+               // Advance callbacks if helpful and low contention.
+               needwake_gp = false;
+               if (!rcu_segcblist_restempty(&rdp->cblist,
+                                            RCU_NEXT_READY_TAIL) ||
+                   (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+                    rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) {
+                       raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
+                       needwake_gp = rcu_advance_cbs(rnp, rdp);
+                       wasempty = rcu_segcblist_restempty(&rdp->cblist,
+                                                          RCU_NEXT_READY_TAIL);
+                       raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */
+               }
+               // Need to wait on some grace period?
+               WARN_ON_ONCE(wasempty &&
+                            !rcu_segcblist_restempty(&rdp->cblist,
+                                                     RCU_NEXT_READY_TAIL));
+               if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
+                       if (!needwait_gp ||
+                           ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
+                               wait_gp_seq = cur_gp_seq;
+                       needwait_gp = true;
+                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
+                                           TPS("NeedWaitGP"));
+               }
+               if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
+                       needwake = rdp->nocb_cb_sleep;
+                       WRITE_ONCE(rdp->nocb_cb_sleep, false);
+                       smp_mb(); /* CB invocation -after- GP end. */
+               } else {
+                       needwake = false;
+               }
+               rcu_nocb_unlock_irqrestore(rdp, flags);
+               if (needwake) {
+                       swake_up_one(&rdp->nocb_cb_wq);
+                       gotcbs = true;
+               }
+               if (needwake_gp)
+                       rcu_gp_kthread_wake();
+               if (needwake_state)
+                       swake_up_one(&rdp->nocb_state_wq);
+       }
+
+       my_rdp->nocb_gp_bypass = bypass;
+       my_rdp->nocb_gp_gp = needwait_gp;
+       my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
+
+       if (bypass && !rcu_nocb_poll) {
+               // At least one child with non-empty ->nocb_bypass, so set
+               // timer in order to avoid stranding its callbacks.
+               wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
+                                  TPS("WakeBypassIsDeferred"));
+       }
+       if (rcu_nocb_poll) {
+               /* Polling, so trace if first poll in the series. */
+               if (gotcbs)
+                       trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
+               schedule_timeout_idle(1);
+       } else if (!needwait_gp) {
+               /* Wait for callbacks to appear. */
+               trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
+               swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
+                               !READ_ONCE(my_rdp->nocb_gp_sleep));
+               trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep"));
+       } else {
+               rnp = my_rdp->mynode;
+               trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
+               swait_event_interruptible_exclusive(
+                       rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
+                       rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
+                       !READ_ONCE(my_rdp->nocb_gp_sleep));
+               trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
+       }
+       if (!rcu_nocb_poll) {
+               raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
+               if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
+                       WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+                       del_timer(&my_rdp->nocb_timer);
+               }
+               WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
+               raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
+       }
+       my_rdp->nocb_gp_seq = -1;
+       WARN_ON(signal_pending(current));
+}
+
+/*
+ * No-CBs grace-period-wait kthread.  There is one of these per group
+ * of CPUs, but only once at least one CPU in that group has come online
+ * at least once since boot.  This kthread checks for newly posted
+ * callbacks from any of the CPUs it is responsible for, waits for a
+ * grace period, then awakens all of the rcu_nocb_cb_kthread() instances
+ * that then have callback-invocation work to do.
+ */
+static int rcu_nocb_gp_kthread(void *arg)
+{
+       struct rcu_data *rdp = arg;
+
+       for (;;) {
+               WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1);
+               nocb_gp_wait(rdp);
+               cond_resched_tasks_rcu_qs();
+       }
+       return 0;
+}
+
+static inline bool nocb_cb_can_run(struct rcu_data *rdp)
+{
+       u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
+       return rcu_segcblist_test_flags(&rdp->cblist, flags);
+}
+
+static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
+{
+       return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
+}
+
+/*
+ * Invoke any ready callbacks from the corresponding no-CBs CPU,
+ * then, if there are no more, wait for more to appear.
+ */
+static void nocb_cb_wait(struct rcu_data *rdp)
+{
+       struct rcu_segcblist *cblist = &rdp->cblist;
+       unsigned long cur_gp_seq;
+       unsigned long flags;
+       bool needwake_state = false;
+       bool needwake_gp = false;
+       bool can_sleep = true;
+       struct rcu_node *rnp = rdp->mynode;
+
+       local_irq_save(flags);
+       rcu_momentary_dyntick_idle();
+       local_irq_restore(flags);
+       /*
+        * Disable BH to provide the expected environment.  Also, when
+        * transitioning to/from NOCB mode, a self-requeuing callback might
+        * be invoked from softirq.  A short grace period could cause both
+        * instances of this callback would execute concurrently.
+        */
+       local_bh_disable();
+       rcu_do_batch(rdp);
+       local_bh_enable();
+       lockdep_assert_irqs_enabled();
+       rcu_nocb_lock_irqsave(rdp, flags);
+       if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
+           rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
+           raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
+               needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
+               raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
+       }
+
+       if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
+               if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) {
+                       rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_CB);
+                       if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
+                               needwake_state = true;
+               }
+               if (rcu_segcblist_ready_cbs(cblist))
+                       can_sleep = false;
+       } else {
+               /*
+                * De-offloading. Clear our flag and notify the de-offload worker.
+                * We won't touch the callbacks and keep sleeping until we ever
+                * get re-offloaded.
+                */
+               WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
+               rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
+               if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
+                       needwake_state = true;
+       }
+
+       WRITE_ONCE(rdp->nocb_cb_sleep, can_sleep);
+
+       if (rdp->nocb_cb_sleep)
+               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
+
+       rcu_nocb_unlock_irqrestore(rdp, flags);
+       if (needwake_gp)
+               rcu_gp_kthread_wake();
+
+       if (needwake_state)
+               swake_up_one(&rdp->nocb_state_wq);
+
+       do {
+               swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
+                                                   nocb_cb_wait_cond(rdp));
+
+               // VVV Ensure CB invocation follows _sleep test.
+               if (smp_load_acquire(&rdp->nocb_cb_sleep)) { // ^^^
+                       WARN_ON(signal_pending(current));
+                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
+               }
+       } while (!nocb_cb_can_run(rdp));
+}
+
+/*
+ * Per-rcu_data kthread, but only for no-CBs CPUs.  Repeatedly invoke
+ * nocb_cb_wait() to do the dirty work.
+ */
+static int rcu_nocb_cb_kthread(void *arg)
+{
+       struct rcu_data *rdp = arg;
+
+       // Each pass through this loop does one callback batch, and,
+       // if there are no more ready callbacks, waits for them.
+       for (;;) {
+               nocb_cb_wait(rdp);
+               cond_resched_tasks_rcu_qs();
+       }
+       return 0;
+}
+
+/* Is a deferred wakeup of rcu_nocb_kthread() required? */
+static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
+{
+       return READ_ONCE(rdp->nocb_defer_wakeup) >= level;
+}
+
+/* Do a deferred wakeup of rcu_nocb_kthread(). */
+static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp,
+                                          struct rcu_data *rdp, int level,
+                                          unsigned long flags)
+       __releases(rdp_gp->nocb_gp_lock)
+{
+       int ndw;
+       int ret;
+
+       if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) {
+               raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+               return false;
+       }
+
+       ndw = rdp_gp->nocb_defer_wakeup;
+       ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
+       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
+
+       return ret;
+}
+
+/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
+static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
+{
+       unsigned long flags;
+       struct rcu_data *rdp = from_timer(rdp, t, nocb_timer);
+
+       WARN_ON_ONCE(rdp->nocb_gp_rdp != rdp);
+       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
+
+       raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags);
+       smp_mb__after_spinlock(); /* Timer expire before wakeup. */
+       do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags);
+}
+
+/*
+ * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
+ * This means we do an inexact common-case check.  Note that if
+ * we miss, ->nocb_timer will eventually clean things up.
+ */
+static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+       unsigned long flags;
+       struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+       if (!rdp_gp || !rcu_nocb_need_deferred_wakeup(rdp_gp, RCU_NOCB_WAKE))
+               return false;
+
+       raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+       return do_nocb_deferred_wakeup_common(rdp_gp, rdp, RCU_NOCB_WAKE, flags);
+}
+
+void rcu_nocb_flush_deferred_wakeup(void)
+{
+       do_nocb_deferred_wakeup(this_cpu_ptr(&rcu_data));
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);
+
+static int rdp_offload_toggle(struct rcu_data *rdp,
+                              bool offload, unsigned long flags)
+       __releases(rdp->nocb_lock)
+{
+       struct rcu_segcblist *cblist = &rdp->cblist;
+       struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+       bool wake_gp = false;
+
+       rcu_segcblist_offload(cblist, offload);
+
+       if (rdp->nocb_cb_sleep)
+               rdp->nocb_cb_sleep = false;
+       rcu_nocb_unlock_irqrestore(rdp, flags);
+
+       /*
+        * Ignore former value of nocb_cb_sleep and force wake up as it could
+        * have been spuriously set to false already.
+        */
+       swake_up_one(&rdp->nocb_cb_wq);
+
+       raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+       if (rdp_gp->nocb_gp_sleep) {
+               rdp_gp->nocb_gp_sleep = false;
+               wake_gp = true;
+       }
+       raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+
+       if (wake_gp)
+               wake_up_process(rdp_gp->nocb_gp_kthread);
+
+       return 0;
+}
+
+static long rcu_nocb_rdp_deoffload(void *arg)
+{
+       struct rcu_data *rdp = arg;
+       struct rcu_segcblist *cblist = &rdp->cblist;
+       unsigned long flags;
+       int ret;
+
+       WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
+
+       pr_info("De-offloading %d\n", rdp->cpu);
+
+       rcu_nocb_lock_irqsave(rdp, flags);
+       /*
+        * Flush once and for all now. This suffices because we are
+        * running on the target CPU holding ->nocb_lock (thus having
+        * interrupts disabled), and because rdp_offload_toggle()
+        * invokes rcu_segcblist_offload(), which clears SEGCBLIST_OFFLOADED.
+        * Thus future calls to rcu_segcblist_completely_offloaded() will
+        * return false, which means that future calls to rcu_nocb_try_bypass()
+        * will refuse to put anything into the bypass.
+        */
+       WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
+       ret = rdp_offload_toggle(rdp, false, flags);
+       swait_event_exclusive(rdp->nocb_state_wq,
+                             !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
+                                                       SEGCBLIST_KTHREAD_GP));
+       /*
+        * Lock one last time to acquire latest callback updates from kthreads
+        * so we can later handle callbacks locally without locking.
+        */
+       rcu_nocb_lock_irqsave(rdp, flags);
+       /*
+        * Theoretically we could set SEGCBLIST_SOFTIRQ_ONLY after the nocb
+        * lock is released but how about being paranoid for once?
+        */
+       rcu_segcblist_set_flags(cblist, SEGCBLIST_SOFTIRQ_ONLY);
+       /*
+        * With SEGCBLIST_SOFTIRQ_ONLY, we can't use
+        * rcu_nocb_unlock_irqrestore() anymore.
+        */
+       raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+
+       /* Sanity check */
+       WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
+
+
+       return ret;
+}
+
+int rcu_nocb_cpu_deoffload(int cpu)
+{
+       struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+       int ret = 0;
+
+       mutex_lock(&rcu_state.barrier_mutex);
+       cpus_read_lock();
+       if (rcu_rdp_is_offloaded(rdp)) {
+               if (cpu_online(cpu)) {
+                       ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
+                       if (!ret)
+                               cpumask_clear_cpu(cpu, rcu_nocb_mask);
+               } else {
+                       pr_info("NOCB: Can't CB-deoffload an offline CPU\n");
+                       ret = -EINVAL;
+               }
+       }
+       cpus_read_unlock();
+       mutex_unlock(&rcu_state.barrier_mutex);
+
+       return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
+
+static long rcu_nocb_rdp_offload(void *arg)
+{
+       struct rcu_data *rdp = arg;
+       struct rcu_segcblist *cblist = &rdp->cblist;
+       unsigned long flags;
+       int ret;
+
+       WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
+       /*
+        * For now we only support re-offload, ie: the rdp must have been
+        * offloaded on boot first.
+        */
+       if (!rdp->nocb_gp_rdp)
+               return -EINVAL;
+
+       pr_info("Offloading %d\n", rdp->cpu);
+       /*
+        * Can't use rcu_nocb_lock_irqsave() while we are in
+        * SEGCBLIST_SOFTIRQ_ONLY mode.
+        */
+       raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+
+       /*
+        * We didn't take the nocb lock while working on the
+        * rdp->cblist in SEGCBLIST_SOFTIRQ_ONLY mode.
+        * Every modifications that have been done previously on
+        * rdp->cblist must be visible remotely by the nocb kthreads
+        * upon wake up after reading the cblist flags.
+        *
+        * The layout against nocb_lock enforces that ordering:
+        *
+        *  __rcu_nocb_rdp_offload()   nocb_cb_wait()/nocb_gp_wait()
+        * -------------------------   ----------------------------
+        *      WRITE callbacks           rcu_nocb_lock()
+        *      rcu_nocb_lock()           READ flags
+        *      WRITE flags               READ callbacks
+        *      rcu_nocb_unlock()         rcu_nocb_unlock()
+        */
+       ret = rdp_offload_toggle(rdp, true, flags);
+       swait_event_exclusive(rdp->nocb_state_wq,
+                             rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB) &&
+                             rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
+
+       return ret;
+}
+
+int rcu_nocb_cpu_offload(int cpu)
+{
+       struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+       int ret = 0;
+
+       mutex_lock(&rcu_state.barrier_mutex);
+       cpus_read_lock();
+       if (!rcu_rdp_is_offloaded(rdp)) {
+               if (cpu_online(cpu)) {
+                       ret = work_on_cpu(cpu, rcu_nocb_rdp_offload, rdp);
+                       if (!ret)
+                               cpumask_set_cpu(cpu, rcu_nocb_mask);
+               } else {
+                       pr_info("NOCB: Can't CB-offload an offline CPU\n");
+                       ret = -EINVAL;
+               }
+       }
+       cpus_read_unlock();
+       mutex_unlock(&rcu_state.barrier_mutex);
+
+       return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_cpu_offload);
+
+void __init rcu_init_nohz(void)
+{
+       int cpu;
+       bool need_rcu_nocb_mask = false;
+       struct rcu_data *rdp;
+
+#if defined(CONFIG_NO_HZ_FULL)
+       if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask))
+               need_rcu_nocb_mask = true;
+#endif /* #if defined(CONFIG_NO_HZ_FULL) */
+
+       if (!cpumask_available(rcu_nocb_mask) && need_rcu_nocb_mask) {
+               if (!zalloc_cpumask_var(&rcu_nocb_mask, GFP_KERNEL)) {
+                       pr_info("rcu_nocb_mask allocation failed, callback offloading disabled.\n");
+                       return;
+               }
+       }
+       if (!cpumask_available(rcu_nocb_mask))
+               return;
+
+#if defined(CONFIG_NO_HZ_FULL)
+       if (tick_nohz_full_running)
+               cpumask_or(rcu_nocb_mask, rcu_nocb_mask, tick_nohz_full_mask);
+#endif /* #if defined(CONFIG_NO_HZ_FULL) */
+
+       if (!cpumask_subset(rcu_nocb_mask, cpu_possible_mask)) {
+               pr_info("\tNote: kernel parameter 'rcu_nocbs=', 'nohz_full', or 'isolcpus=' contains nonexistent CPUs.\n");
+               cpumask_and(rcu_nocb_mask, cpu_possible_mask,
+                           rcu_nocb_mask);
+       }
+       if (cpumask_empty(rcu_nocb_mask))
+               pr_info("\tOffload RCU callbacks from CPUs: (none).\n");
+       else
+               pr_info("\tOffload RCU callbacks from CPUs: %*pbl.\n",
+                       cpumask_pr_args(rcu_nocb_mask));
+       if (rcu_nocb_poll)
+               pr_info("\tPoll for callbacks from no-CBs CPUs.\n");
+
+       for_each_cpu(cpu, rcu_nocb_mask) {
+               rdp = per_cpu_ptr(&rcu_data, cpu);
+               if (rcu_segcblist_empty(&rdp->cblist))
+                       rcu_segcblist_init(&rdp->cblist);
+               rcu_segcblist_offload(&rdp->cblist, true);
+               rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB);
+               rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
+       }
+       rcu_organize_nocb_kthreads();
+}
+
+/* Initialize per-rcu_data variables for no-CBs CPUs. */
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+       init_swait_queue_head(&rdp->nocb_cb_wq);
+       init_swait_queue_head(&rdp->nocb_gp_wq);
+       init_swait_queue_head(&rdp->nocb_state_wq);
+       raw_spin_lock_init(&rdp->nocb_lock);
+       raw_spin_lock_init(&rdp->nocb_bypass_lock);
+       raw_spin_lock_init(&rdp->nocb_gp_lock);
+       timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
+       rcu_cblist_init(&rdp->nocb_bypass);
+}
+
+/*
+ * If the specified CPU is a no-CBs CPU that does not already have its
+ * rcuo CB kthread, spawn it.  Additionally, if the rcuo GP kthread
+ * for this CPU's group has not yet been created, spawn it as well.
+ */
+static void rcu_spawn_one_nocb_kthread(int cpu)
+{
+       struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+       struct rcu_data *rdp_gp;
+       struct task_struct *t;
+
+       /*
+        * If this isn't a no-CBs CPU or if it already has an rcuo kthread,
+        * then nothing to do.
+        */
+       if (!rcu_is_nocb_cpu(cpu) || rdp->nocb_cb_kthread)
+               return;
+
+       /* If we didn't spawn the GP kthread first, reorganize! */
+       rdp_gp = rdp->nocb_gp_rdp;
+       if (!rdp_gp->nocb_gp_kthread) {
+               t = kthread_run(rcu_nocb_gp_kthread, rdp_gp,
+                               "rcuog/%d", rdp_gp->cpu);
+               if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__))
+                       return;
+               WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);
+       }
+
+       /* Spawn the kthread for this CPU. */
+       t = kthread_run(rcu_nocb_cb_kthread, rdp,
+                       "rcuo%c/%d", rcu_state.abbr, cpu);
+       if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))
+               return;
+       WRITE_ONCE(rdp->nocb_cb_kthread, t);
+       WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);
+}
+
+/*
+ * If the specified CPU is a no-CBs CPU that does not already have its
+ * rcuo kthread, spawn it.
+ */
+static void rcu_spawn_cpu_nocb_kthread(int cpu)
+{
+       if (rcu_scheduler_fully_active)
+               rcu_spawn_one_nocb_kthread(cpu);
+}
+
+/*
+ * Once the scheduler is running, spawn rcuo kthreads for all online
+ * no-CBs CPUs.  This assumes that the early_initcall()s happen before
+ * non-boot CPUs come online -- if this changes, we will need to add
+ * some mutual exclusion.
+ */
+static void __init rcu_spawn_nocb_kthreads(void)
+{
+       int cpu;
+
+       for_each_online_cpu(cpu)
+               rcu_spawn_cpu_nocb_kthread(cpu);
+}
+
+/* How many CB CPU IDs per GP kthread?  Default of -1 for sqrt(nr_cpu_ids). */
+static int rcu_nocb_gp_stride = -1;
+module_param(rcu_nocb_gp_stride, int, 0444);
+
+/*
+ * Initialize GP-CB relationships for all no-CBs CPU.
+ */
+static void __init rcu_organize_nocb_kthreads(void)
+{
+       int cpu;
+       bool firsttime = true;
+       bool gotnocbs = false;
+       bool gotnocbscbs = true;
+       int ls = rcu_nocb_gp_stride;
+       int nl = 0;  /* Next GP kthread. */
+       struct rcu_data *rdp;
+       struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */
+       struct rcu_data *rdp_prev = NULL;
+
+       if (!cpumask_available(rcu_nocb_mask))
+               return;
+       if (ls == -1) {
+               ls = nr_cpu_ids / int_sqrt(nr_cpu_ids);
+               rcu_nocb_gp_stride = ls;
+       }
+
+       /*
+        * Each pass through this loop sets up one rcu_data structure.
+        * Should the corresponding CPU come online in the future, then
+        * we will spawn the needed set of rcu_nocb_kthread() kthreads.
+        */
+       for_each_cpu(cpu, rcu_nocb_mask) {
+               rdp = per_cpu_ptr(&rcu_data, cpu);
+               if (rdp->cpu >= nl) {
+                       /* New GP kthread, set up for CBs & next GP. */
+                       gotnocbs = true;
+                       nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
+                       rdp->nocb_gp_rdp = rdp;
+                       rdp_gp = rdp;
+                       if (dump_tree) {
+                               if (!firsttime)
+                                       pr_cont("%s\n", gotnocbscbs
+                                                       ? "" : " (self only)");
+                               gotnocbscbs = false;
+                               firsttime = false;
+                               pr_alert("%s: No-CB GP kthread CPU %d:",
+                                        __func__, cpu);
+                       }
+               } else {
+                       /* Another CB kthread, link to previous GP kthread. */
+                       gotnocbscbs = true;
+                       rdp->nocb_gp_rdp = rdp_gp;
+                       rdp_prev->nocb_next_cb_rdp = rdp;
+                       if (dump_tree)
+                               pr_cont(" %d", cpu);
+               }
+               rdp_prev = rdp;
+       }
+       if (gotnocbs && dump_tree)
+               pr_cont("%s\n", gotnocbscbs ? "" : " (self only)");
+}
+
+/*
+ * Bind the current task to the offloaded CPUs.  If there are no offloaded
+ * CPUs, leave the task unbound.  Splat if the bind attempt fails.
+ */
+void rcu_bind_current_to_nocb(void)
+{
+       if (cpumask_available(rcu_nocb_mask) && cpumask_weight(rcu_nocb_mask))
+               WARN_ON(sched_setaffinity(current->pid, rcu_nocb_mask));
+}
+EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
+
+// The ->on_cpu field is available only in CONFIG_SMP=y, so...
+#ifdef CONFIG_SMP
+static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
+{
+       return tsp && task_is_running(tsp) && !tsp->on_cpu ? "!" : "";
+}
+#else // #ifdef CONFIG_SMP
+static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
+{
+       return "";
+}
+#endif // #else #ifdef CONFIG_SMP
+
+/*
+ * Dump out nocb grace-period kthread state for the specified rcu_data
+ * structure.
+ */
+static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
+{
+       struct rcu_node *rnp = rdp->mynode;
+
+       pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
+               rdp->cpu,
+               "kK"[!!rdp->nocb_gp_kthread],
+               "lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
+               "dD"[!!rdp->nocb_defer_wakeup],
+               "tT"[timer_pending(&rdp->nocb_timer)],
+               "sS"[!!rdp->nocb_gp_sleep],
+               ".W"[swait_active(&rdp->nocb_gp_wq)],
+               ".W"[swait_active(&rnp->nocb_gp_wq[0])],
+               ".W"[swait_active(&rnp->nocb_gp_wq[1])],
+               ".B"[!!rdp->nocb_gp_bypass],
+               ".G"[!!rdp->nocb_gp_gp],
+               (long)rdp->nocb_gp_seq,
+               rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops),
+               rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.',
+               rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
+               show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
+}
+
+/* Dump out nocb kthread state for the specified rcu_data structure. */
+static void show_rcu_nocb_state(struct rcu_data *rdp)
+{
+       char bufw[20];
+       char bufr[20];
+       struct rcu_segcblist *rsclp = &rdp->cblist;
+       bool waslocked;
+       bool wassleep;
+
+       if (rdp->nocb_gp_rdp == rdp)
+               show_rcu_nocb_gp_state(rdp);
+
+       sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]);
+       sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]);
+       pr_info("   CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n",
+               rdp->cpu, rdp->nocb_gp_rdp->cpu,
+               rdp->nocb_next_cb_rdp ? rdp->nocb_next_cb_rdp->cpu : -1,
+               "kK"[!!rdp->nocb_cb_kthread],
+               "bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
+               "cC"[!!atomic_read(&rdp->nocb_lock_contended)],
+               "lL"[raw_spin_is_locked(&rdp->nocb_lock)],
+               "sS"[!!rdp->nocb_cb_sleep],
+               ".W"[swait_active(&rdp->nocb_cb_wq)],
+               jiffies - rdp->nocb_bypass_first,
+               jiffies - rdp->nocb_nobypass_last,
+               rdp->nocb_nobypass_count,
+               ".D"[rcu_segcblist_ready_cbs(rsclp)],
+               ".W"[!rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL)],
+               rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL) ? "" : bufw,
+               ".R"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL)],
+               rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL) ? "" : bufr,
+               ".N"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL)],
+               ".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)],
+               rcu_segcblist_n_cbs(&rdp->cblist),
+               rdp->nocb_cb_kthread ? task_state_to_char(rdp->nocb_cb_kthread) : '.',
+               rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
+               show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
+
+       /* It is OK for GP kthreads to have GP state. */
+       if (rdp->nocb_gp_rdp == rdp)
+               return;
+
+       waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock);
+       wassleep = swait_active(&rdp->nocb_gp_wq);
+       if (!rdp->nocb_gp_sleep && !waslocked && !wassleep)
+               return;  /* Nothing untoward. */
+
+       pr_info("   nocb GP activity on CB-only CPU!!! %c%c%c %c\n",
+               "lL"[waslocked],
+               "dD"[!!rdp->nocb_defer_wakeup],
+               "sS"[!!rdp->nocb_gp_sleep],
+               ".W"[wassleep]);
+}
+
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+
+static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
+{
+       return 0;
+}
+
+static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
+{
+       return false;
+}
+
+/* No ->nocb_lock to acquire.  */
+static void rcu_nocb_lock(struct rcu_data *rdp)
+{
+}
+
+/* No ->nocb_lock to release.  */
+static void rcu_nocb_unlock(struct rcu_data *rdp)
+{
+}
+
+/* No ->nocb_lock to release.  */
+static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
+                                      unsigned long flags)
+{
+       local_irq_restore(flags);
+}
+
+/* Lockdep check that ->cblist may be safely accessed. */
+static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
+{
+       lockdep_assert_irqs_disabled();
+}
+
+static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
+{
+}
+
+static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
+{
+       return NULL;
+}
+
+static void rcu_init_one_nocb(struct rcu_node *rnp)
+{
+}
+
+static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+                                 unsigned long j)
+{
+       return true;
+}
+
+static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
+                               bool *was_alldone, unsigned long flags)
+{
+       return false;
+}
+
+static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
+                                unsigned long flags)
+{
+       WARN_ON_ONCE(1);  /* Should be dead code! */
+}
+
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+}
+
+static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
+{
+       return false;
+}
+
+static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+       return false;
+}
+
+static void rcu_spawn_cpu_nocb_kthread(int cpu)
+{
+}
+
+static void __init rcu_spawn_nocb_kthreads(void)
+{
+}
+
+static void show_rcu_nocb_state(struct rcu_data *rdp)
+{
+}
+
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
index de1dc3b..7a4876a 100644 (file)
 
 #include "../locking/rtmutex_common.h"
 
-#ifdef CONFIG_RCU_NOCB_CPU
-static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
-static bool __read_mostly rcu_nocb_poll;    /* Offload kthread are to poll. */
-static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
-{
-       return lockdep_is_held(&rdp->nocb_lock);
-}
-
-static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
-{
-       /* Race on early boot between thread creation and assignment */
-       if (!rdp->nocb_cb_kthread || !rdp->nocb_gp_kthread)
-               return true;
-
-       if (current == rdp->nocb_cb_kthread || current == rdp->nocb_gp_kthread)
-               if (in_task())
-                       return true;
-       return false;
-}
-
-#else
-static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
-{
-       return 0;
-}
-
-static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
-{
-       return false;
-}
-
-#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
-
 static bool rcu_rdp_is_offloaded(struct rcu_data *rdp)
 {
        /*
@@ -346,7 +313,7 @@ void rcu_note_context_switch(bool preempt)
 
        trace_rcu_utilization(TPS("Start context switch"));
        lockdep_assert_irqs_disabled();
-       WARN_ON_ONCE(!preempt && rcu_preempt_depth() > 0);
+       WARN_ONCE(!preempt && rcu_preempt_depth() > 0, "Voluntary context switch within RCU read-side critical section!");
        if (rcu_preempt_depth() > 0 &&
            !t->rcu_read_unlock_special.b.blocked) {
 
@@ -405,17 +372,20 @@ static int rcu_preempt_blocked_readers_cgp(struct rcu_node *rnp)
 
 static void rcu_preempt_read_enter(void)
 {
-       current->rcu_read_lock_nesting++;
+       WRITE_ONCE(current->rcu_read_lock_nesting, READ_ONCE(current->rcu_read_lock_nesting) + 1);
 }
 
 static int rcu_preempt_read_exit(void)
 {
-       return --current->rcu_read_lock_nesting;
+       int ret = READ_ONCE(current->rcu_read_lock_nesting) - 1;
+
+       WRITE_ONCE(current->rcu_read_lock_nesting, ret);
+       return ret;
 }
 
 static void rcu_preempt_depth_set(int val)
 {
-       current->rcu_read_lock_nesting = val;
+       WRITE_ONCE(current->rcu_read_lock_nesting, val);
 }
 
 /*
@@ -1479,1460 +1449,6 @@ static void rcu_cleanup_after_idle(void)
 
 #endif /* #else #if !defined(CONFIG_RCU_FAST_NO_HZ) */
 
-#ifdef CONFIG_RCU_NOCB_CPU
-
-/*
- * Offload callback processing from the boot-time-specified set of CPUs
- * specified by rcu_nocb_mask.  For the CPUs in the set, there are kthreads
- * created that pull the callbacks from the corresponding CPU, wait for
- * a grace period to elapse, and invoke the callbacks.  These kthreads
- * are organized into GP kthreads, which manage incoming callbacks, wait for
- * grace periods, and awaken CB kthreads, and the CB kthreads, which only
- * invoke callbacks.  Each GP kthread invokes its own CBs.  The no-CBs CPUs
- * do a wake_up() on their GP kthread when they insert a callback into any
- * empty list, unless the rcu_nocb_poll boot parameter has been specified,
- * in which case each kthread actively polls its CPU.  (Which isn't so great
- * for energy efficiency, but which does reduce RCU's overhead on that CPU.)
- *
- * This is intended to be used in conjunction with Frederic Weisbecker's
- * adaptive-idle work, which would seriously reduce OS jitter on CPUs
- * running CPU-bound user-mode computations.
- *
- * Offloading of callbacks can also be used as an energy-efficiency
- * measure because CPUs with no RCU callbacks queued are more aggressive
- * about entering dyntick-idle mode.
- */
-
-
-/*
- * Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters.
- * If the list is invalid, a warning is emitted and all CPUs are offloaded.
- */
-static int __init rcu_nocb_setup(char *str)
-{
-       alloc_bootmem_cpumask_var(&rcu_nocb_mask);
-       if (cpulist_parse(str, rcu_nocb_mask)) {
-               pr_warn("rcu_nocbs= bad CPU range, all CPUs set\n");
-               cpumask_setall(rcu_nocb_mask);
-       }
-       return 1;
-}
-__setup("rcu_nocbs=", rcu_nocb_setup);
-
-static int __init parse_rcu_nocb_poll(char *arg)
-{
-       rcu_nocb_poll = true;
-       return 0;
-}
-early_param("rcu_nocb_poll", parse_rcu_nocb_poll);
-
-/*
- * Don't bother bypassing ->cblist if the call_rcu() rate is low.
- * After all, the main point of bypassing is to avoid lock contention
- * on ->nocb_lock, which only can happen at high call_rcu() rates.
- */
-static int nocb_nobypass_lim_per_jiffy = 16 * 1000 / HZ;
-module_param(nocb_nobypass_lim_per_jiffy, int, 0);
-
-/*
- * Acquire the specified rcu_data structure's ->nocb_bypass_lock.  If the
- * lock isn't immediately available, increment ->nocb_lock_contended to
- * flag the contention.
- */
-static void rcu_nocb_bypass_lock(struct rcu_data *rdp)
-       __acquires(&rdp->nocb_bypass_lock)
-{
-       lockdep_assert_irqs_disabled();
-       if (raw_spin_trylock(&rdp->nocb_bypass_lock))
-               return;
-       atomic_inc(&rdp->nocb_lock_contended);
-       WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
-       smp_mb__after_atomic(); /* atomic_inc() before lock. */
-       raw_spin_lock(&rdp->nocb_bypass_lock);
-       smp_mb__before_atomic(); /* atomic_dec() after lock. */
-       atomic_dec(&rdp->nocb_lock_contended);
-}
-
-/*
- * Spinwait until the specified rcu_data structure's ->nocb_lock is
- * not contended.  Please note that this is extremely special-purpose,
- * relying on the fact that at most two kthreads and one CPU contend for
- * this lock, and also that the two kthreads are guaranteed to have frequent
- * grace-period-duration time intervals between successive acquisitions
- * of the lock.  This allows us to use an extremely simple throttling
- * mechanism, and further to apply it only to the CPU doing floods of
- * call_rcu() invocations.  Don't try this at home!
- */
-static void rcu_nocb_wait_contended(struct rcu_data *rdp)
-{
-       WARN_ON_ONCE(smp_processor_id() != rdp->cpu);
-       while (WARN_ON_ONCE(atomic_read(&rdp->nocb_lock_contended)))
-               cpu_relax();
-}
-
-/*
- * Conditionally acquire the specified rcu_data structure's
- * ->nocb_bypass_lock.
- */
-static bool rcu_nocb_bypass_trylock(struct rcu_data *rdp)
-{
-       lockdep_assert_irqs_disabled();
-       return raw_spin_trylock(&rdp->nocb_bypass_lock);
-}
-
-/*
- * Release the specified rcu_data structure's ->nocb_bypass_lock.
- */
-static void rcu_nocb_bypass_unlock(struct rcu_data *rdp)
-       __releases(&rdp->nocb_bypass_lock)
-{
-       lockdep_assert_irqs_disabled();
-       raw_spin_unlock(&rdp->nocb_bypass_lock);
-}
-
-/*
- * Acquire the specified rcu_data structure's ->nocb_lock, but only
- * if it corresponds to a no-CBs CPU.
- */
-static void rcu_nocb_lock(struct rcu_data *rdp)
-{
-       lockdep_assert_irqs_disabled();
-       if (!rcu_rdp_is_offloaded(rdp))
-               return;
-       raw_spin_lock(&rdp->nocb_lock);
-}
-
-/*
- * Release the specified rcu_data structure's ->nocb_lock, but only
- * if it corresponds to a no-CBs CPU.
- */
-static void rcu_nocb_unlock(struct rcu_data *rdp)
-{
-       if (rcu_rdp_is_offloaded(rdp)) {
-               lockdep_assert_irqs_disabled();
-               raw_spin_unlock(&rdp->nocb_lock);
-       }
-}
-
-/*
- * Release the specified rcu_data structure's ->nocb_lock and restore
- * interrupts, but only if it corresponds to a no-CBs CPU.
- */
-static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
-                                      unsigned long flags)
-{
-       if (rcu_rdp_is_offloaded(rdp)) {
-               lockdep_assert_irqs_disabled();
-               raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
-       } else {
-               local_irq_restore(flags);
-       }
-}
-
-/* Lockdep check that ->cblist may be safely accessed. */
-static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
-{
-       lockdep_assert_irqs_disabled();
-       if (rcu_rdp_is_offloaded(rdp))
-               lockdep_assert_held(&rdp->nocb_lock);
-}
-
-/*
- * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended
- * grace period.
- */
-static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
-{
-       swake_up_all(sq);
-}
-
-static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
-{
-       return &rnp->nocb_gp_wq[rcu_seq_ctr(rnp->gp_seq) & 0x1];
-}
-
-static void rcu_init_one_nocb(struct rcu_node *rnp)
-{
-       init_swait_queue_head(&rnp->nocb_gp_wq[0]);
-       init_swait_queue_head(&rnp->nocb_gp_wq[1]);
-}
-
-/* Is the specified CPU a no-CBs CPU? */
-bool rcu_is_nocb_cpu(int cpu)
-{
-       if (cpumask_available(rcu_nocb_mask))
-               return cpumask_test_cpu(cpu, rcu_nocb_mask);
-       return false;
-}
-
-static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
-                          struct rcu_data *rdp,
-                          bool force, unsigned long flags)
-       __releases(rdp_gp->nocb_gp_lock)
-{
-       bool needwake = false;
-
-       if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
-               raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                   TPS("AlreadyAwake"));
-               return false;
-       }
-
-       if (rdp_gp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
-               WRITE_ONCE(rdp_gp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
-               del_timer(&rdp_gp->nocb_timer);
-       }
-
-       if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
-               WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
-               needwake = true;
-       }
-       raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-       if (needwake) {
-               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
-               wake_up_process(rdp_gp->nocb_gp_kthread);
-       }
-
-       return needwake;
-}
-
-/*
- * Kick the GP kthread for this NOCB group.
- */
-static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
-{
-       unsigned long flags;
-       struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-
-       raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-       return __wake_nocb_gp(rdp_gp, rdp, force, flags);
-}
-
-/*
- * Arrange to wake the GP kthread for this NOCB group at some future
- * time when it is safe to do so.
- */
-static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
-                              const char *reason)
-{
-       unsigned long flags;
-       struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-
-       raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-
-       /*
-        * Bypass wakeup overrides previous deferments. In case
-        * of callback storm, no need to wake up too early.
-        */
-       if (waketype == RCU_NOCB_WAKE_BYPASS) {
-               mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
-               WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
-       } else {
-               if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE)
-                       mod_timer(&rdp_gp->nocb_timer, jiffies + 1);
-               if (rdp_gp->nocb_defer_wakeup < waketype)
-                       WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
-       }
-
-       raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-
-       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
-}
-
-/*
- * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
- * However, if there is a callback to be enqueued and if ->nocb_bypass
- * proves to be initially empty, just return false because the no-CB GP
- * kthread may need to be awakened in this case.
- *
- * Note that this function always returns true if rhp is NULL.
- */
-static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                                    unsigned long j)
-{
-       struct rcu_cblist rcl;
-
-       WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
-       rcu_lockdep_assert_cblist_protected(rdp);
-       lockdep_assert_held(&rdp->nocb_bypass_lock);
-       if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
-               raw_spin_unlock(&rdp->nocb_bypass_lock);
-               return false;
-       }
-       /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
-       if (rhp)
-               rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
-       rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
-       rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
-       WRITE_ONCE(rdp->nocb_bypass_first, j);
-       rcu_nocb_bypass_unlock(rdp);
-       return true;
-}
-
-/*
- * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
- * However, if there is a callback to be enqueued and if ->nocb_bypass
- * proves to be initially empty, just return false because the no-CB GP
- * kthread may need to be awakened in this case.
- *
- * Note that this function always returns true if rhp is NULL.
- */
-static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                                 unsigned long j)
-{
-       if (!rcu_rdp_is_offloaded(rdp))
-               return true;
-       rcu_lockdep_assert_cblist_protected(rdp);
-       rcu_nocb_bypass_lock(rdp);
-       return rcu_nocb_do_flush_bypass(rdp, rhp, j);
-}
-
-/*
- * If the ->nocb_bypass_lock is immediately available, flush the
- * ->nocb_bypass queue into ->cblist.
- */
-static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
-{
-       rcu_lockdep_assert_cblist_protected(rdp);
-       if (!rcu_rdp_is_offloaded(rdp) ||
-           !rcu_nocb_bypass_trylock(rdp))
-               return;
-       WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
-}
-
-/*
- * See whether it is appropriate to use the ->nocb_bypass list in order
- * to control contention on ->nocb_lock.  A limited number of direct
- * enqueues are permitted into ->cblist per jiffy.  If ->nocb_bypass
- * is non-empty, further callbacks must be placed into ->nocb_bypass,
- * otherwise rcu_barrier() breaks.  Use rcu_nocb_flush_bypass() to switch
- * back to direct use of ->cblist.  However, ->nocb_bypass should not be
- * used if ->cblist is empty, because otherwise callbacks can be stranded
- * on ->nocb_bypass because we cannot count on the current CPU ever again
- * invoking call_rcu().  The general rule is that if ->nocb_bypass is
- * non-empty, the corresponding no-CBs grace-period kthread must not be
- * in an indefinite sleep state.
- *
- * Finally, it is not permitted to use the bypass during early boot,
- * as doing so would confuse the auto-initialization code.  Besides
- * which, there is no point in worrying about lock contention while
- * there is only one CPU in operation.
- */
-static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                               bool *was_alldone, unsigned long flags)
-{
-       unsigned long c;
-       unsigned long cur_gp_seq;
-       unsigned long j = jiffies;
-       long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-
-       lockdep_assert_irqs_disabled();
-
-       // Pure softirq/rcuc based processing: no bypassing, no
-       // locking.
-       if (!rcu_rdp_is_offloaded(rdp)) {
-               *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-               return false;
-       }
-
-       // In the process of (de-)offloading: no bypassing, but
-       // locking.
-       if (!rcu_segcblist_completely_offloaded(&rdp->cblist)) {
-               rcu_nocb_lock(rdp);
-               *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-               return false; /* Not offloaded, no bypassing. */
-       }
-
-       // Don't use ->nocb_bypass during early boot.
-       if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) {
-               rcu_nocb_lock(rdp);
-               WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-               *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-               return false;
-       }
-
-       // If we have advanced to a new jiffy, reset counts to allow
-       // moving back from ->nocb_bypass to ->cblist.
-       if (j == rdp->nocb_nobypass_last) {
-               c = rdp->nocb_nobypass_count + 1;
-       } else {
-               WRITE_ONCE(rdp->nocb_nobypass_last, j);
-               c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy;
-               if (ULONG_CMP_LT(rdp->nocb_nobypass_count,
-                                nocb_nobypass_lim_per_jiffy))
-                       c = 0;
-               else if (c > nocb_nobypass_lim_per_jiffy)
-                       c = nocb_nobypass_lim_per_jiffy;
-       }
-       WRITE_ONCE(rdp->nocb_nobypass_count, c);
-
-       // If there hasn't yet been all that many ->cblist enqueues
-       // this jiffy, tell the caller to enqueue onto ->cblist.  But flush
-       // ->nocb_bypass first.
-       if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
-               rcu_nocb_lock(rdp);
-               *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-               if (*was_alldone)
-                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                           TPS("FirstQ"));
-               WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
-               WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-               return false; // Caller must enqueue the callback.
-       }
-
-       // If ->nocb_bypass has been used too long or is too full,
-       // flush ->nocb_bypass to ->cblist.
-       if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
-           ncbs >= qhimark) {
-               rcu_nocb_lock(rdp);
-               if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
-                       *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
-                       if (*was_alldone)
-                               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                                   TPS("FirstQ"));
-                       WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-                       return false; // Caller must enqueue the callback.
-               }
-               if (j != rdp->nocb_gp_adv_time &&
-                   rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
-                   rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
-                       rcu_advance_cbs_nowake(rdp->mynode, rdp);
-                       rdp->nocb_gp_adv_time = j;
-               }
-               rcu_nocb_unlock_irqrestore(rdp, flags);
-               return true; // Callback already enqueued.
-       }
-
-       // We need to use the bypass.
-       rcu_nocb_wait_contended(rdp);
-       rcu_nocb_bypass_lock(rdp);
-       ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-       rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
-       rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
-       if (!ncbs) {
-               WRITE_ONCE(rdp->nocb_bypass_first, j);
-               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
-       }
-       rcu_nocb_bypass_unlock(rdp);
-       smp_mb(); /* Order enqueue before wake. */
-       if (ncbs) {
-               local_irq_restore(flags);
-       } else {
-               // No-CBs GP kthread might be indefinitely asleep, if so, wake.
-               rcu_nocb_lock(rdp); // Rare during call_rcu() flood.
-               if (!rcu_segcblist_pend_cbs(&rdp->cblist)) {
-                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                           TPS("FirstBQwake"));
-                       __call_rcu_nocb_wake(rdp, true, flags);
-               } else {
-                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                           TPS("FirstBQnoWake"));
-                       rcu_nocb_unlock_irqrestore(rdp, flags);
-               }
-       }
-       return true; // Callback already enqueued.
-}
-
-/*
- * Awaken the no-CBs grace-period kthread if needed, either due to it
- * legitimately being asleep or due to overload conditions.
- *
- * If warranted, also wake up the kthread servicing this CPUs queues.
- */
-static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
-                                unsigned long flags)
-                                __releases(rdp->nocb_lock)
-{
-       unsigned long cur_gp_seq;
-       unsigned long j;
-       long len;
-       struct task_struct *t;
-
-       // If we are being polled or there is no kthread, just leave.
-       t = READ_ONCE(rdp->nocb_gp_kthread);
-       if (rcu_nocb_poll || !t) {
-               rcu_nocb_unlock_irqrestore(rdp, flags);
-               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                   TPS("WakeNotPoll"));
-               return;
-       }
-       // Need to actually to a wakeup.
-       len = rcu_segcblist_n_cbs(&rdp->cblist);
-       if (was_alldone) {
-               rdp->qlen_last_fqs_check = len;
-               if (!irqs_disabled_flags(flags)) {
-                       /* ... if queue was empty ... */
-                       rcu_nocb_unlock_irqrestore(rdp, flags);
-                       wake_nocb_gp(rdp, false);
-                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                           TPS("WakeEmpty"));
-               } else {
-                       rcu_nocb_unlock_irqrestore(rdp, flags);
-                       wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
-                                          TPS("WakeEmptyIsDeferred"));
-               }
-       } else if (len > rdp->qlen_last_fqs_check + qhimark) {
-               /* ... or if many callbacks queued. */
-               rdp->qlen_last_fqs_check = len;
-               j = jiffies;
-               if (j != rdp->nocb_gp_adv_time &&
-                   rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
-                   rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
-                       rcu_advance_cbs_nowake(rdp->mynode, rdp);
-                       rdp->nocb_gp_adv_time = j;
-               }
-               smp_mb(); /* Enqueue before timer_pending(). */
-               if ((rdp->nocb_cb_sleep ||
-                    !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
-                   !timer_pending(&rdp->nocb_timer)) {
-                       rcu_nocb_unlock_irqrestore(rdp, flags);
-                       wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
-                                          TPS("WakeOvfIsDeferred"));
-               } else {
-                       rcu_nocb_unlock_irqrestore(rdp, flags);
-                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
-               }
-       } else {
-               rcu_nocb_unlock_irqrestore(rdp, flags);
-               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
-       }
-       return;
-}
-
-/*
- * Check if we ignore this rdp.
- *
- * We check that without holding the nocb lock but
- * we make sure not to miss a freshly offloaded rdp
- * with the current ordering:
- *
- *  rdp_offload_toggle()        nocb_gp_enabled_cb()
- * -------------------------   ----------------------------
- *    WRITE flags                 LOCK nocb_gp_lock
- *    LOCK nocb_gp_lock           READ/WRITE nocb_gp_sleep
- *    READ/WRITE nocb_gp_sleep    UNLOCK nocb_gp_lock
- *    UNLOCK nocb_gp_lock         READ flags
- */
-static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp)
-{
-       u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP;
-
-       return rcu_segcblist_test_flags(&rdp->cblist, flags);
-}
-
-static inline bool nocb_gp_update_state_deoffloading(struct rcu_data *rdp,
-                                                    bool *needwake_state)
-{
-       struct rcu_segcblist *cblist = &rdp->cblist;
-
-       if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
-               if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
-                       rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
-                       if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
-                               *needwake_state = true;
-               }
-               return false;
-       }
-
-       /*
-        * De-offloading. Clear our flag and notify the de-offload worker.
-        * We will ignore this rdp until it ever gets re-offloaded.
-        */
-       WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
-       rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
-       if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
-               *needwake_state = true;
-       return true;
-}
-
-
-/*
- * No-CBs GP kthreads come here to wait for additional callbacks to show up
- * or for grace periods to end.
- */
-static void nocb_gp_wait(struct rcu_data *my_rdp)
-{
-       bool bypass = false;
-       long bypass_ncbs;
-       int __maybe_unused cpu = my_rdp->cpu;
-       unsigned long cur_gp_seq;
-       unsigned long flags;
-       bool gotcbs = false;
-       unsigned long j = jiffies;
-       bool needwait_gp = false; // This prevents actual uninitialized use.
-       bool needwake;
-       bool needwake_gp;
-       struct rcu_data *rdp;
-       struct rcu_node *rnp;
-       unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.
-       bool wasempty = false;
-
-       /*
-        * Each pass through the following loop checks for CBs and for the
-        * nearest grace period (if any) to wait for next.  The CB kthreads
-        * and the global grace-period kthread are awakened if needed.
-        */
-       WARN_ON_ONCE(my_rdp->nocb_gp_rdp != my_rdp);
-       for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
-               bool needwake_state = false;
-
-               if (!nocb_gp_enabled_cb(rdp))
-                       continue;
-               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
-               rcu_nocb_lock_irqsave(rdp, flags);
-               if (nocb_gp_update_state_deoffloading(rdp, &needwake_state)) {
-                       rcu_nocb_unlock_irqrestore(rdp, flags);
-                       if (needwake_state)
-                               swake_up_one(&rdp->nocb_state_wq);
-                       continue;
-               }
-               bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-               if (bypass_ncbs &&
-                   (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
-                    bypass_ncbs > 2 * qhimark)) {
-                       // Bypass full or old, so flush it.
-                       (void)rcu_nocb_try_flush_bypass(rdp, j);
-                       bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
-               } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
-                       rcu_nocb_unlock_irqrestore(rdp, flags);
-                       if (needwake_state)
-                               swake_up_one(&rdp->nocb_state_wq);
-                       continue; /* No callbacks here, try next. */
-               }
-               if (bypass_ncbs) {
-                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                           TPS("Bypass"));
-                       bypass = true;
-               }
-               rnp = rdp->mynode;
-
-               // Advance callbacks if helpful and low contention.
-               needwake_gp = false;
-               if (!rcu_segcblist_restempty(&rdp->cblist,
-                                            RCU_NEXT_READY_TAIL) ||
-                   (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
-                    rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) {
-                       raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
-                       needwake_gp = rcu_advance_cbs(rnp, rdp);
-                       wasempty = rcu_segcblist_restempty(&rdp->cblist,
-                                                          RCU_NEXT_READY_TAIL);
-                       raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */
-               }
-               // Need to wait on some grace period?
-               WARN_ON_ONCE(wasempty &&
-                            !rcu_segcblist_restempty(&rdp->cblist,
-                                                     RCU_NEXT_READY_TAIL));
-               if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
-                       if (!needwait_gp ||
-                           ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
-                               wait_gp_seq = cur_gp_seq;
-                       needwait_gp = true;
-                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
-                                           TPS("NeedWaitGP"));
-               }
-               if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
-                       needwake = rdp->nocb_cb_sleep;
-                       WRITE_ONCE(rdp->nocb_cb_sleep, false);
-                       smp_mb(); /* CB invocation -after- GP end. */
-               } else {
-                       needwake = false;
-               }
-               rcu_nocb_unlock_irqrestore(rdp, flags);
-               if (needwake) {
-                       swake_up_one(&rdp->nocb_cb_wq);
-                       gotcbs = true;
-               }
-               if (needwake_gp)
-                       rcu_gp_kthread_wake();
-               if (needwake_state)
-                       swake_up_one(&rdp->nocb_state_wq);
-       }
-
-       my_rdp->nocb_gp_bypass = bypass;
-       my_rdp->nocb_gp_gp = needwait_gp;
-       my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
-
-       if (bypass && !rcu_nocb_poll) {
-               // At least one child with non-empty ->nocb_bypass, so set
-               // timer in order to avoid stranding its callbacks.
-               wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
-                                  TPS("WakeBypassIsDeferred"));
-       }
-       if (rcu_nocb_poll) {
-               /* Polling, so trace if first poll in the series. */
-               if (gotcbs)
-                       trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
-               schedule_timeout_idle(1);
-       } else if (!needwait_gp) {
-               /* Wait for callbacks to appear. */
-               trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
-               swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
-                               !READ_ONCE(my_rdp->nocb_gp_sleep));
-               trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep"));
-       } else {
-               rnp = my_rdp->mynode;
-               trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
-               swait_event_interruptible_exclusive(
-                       rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
-                       rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
-                       !READ_ONCE(my_rdp->nocb_gp_sleep));
-               trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
-       }
-       if (!rcu_nocb_poll) {
-               raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
-               if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
-                       WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
-                       del_timer(&my_rdp->nocb_timer);
-               }
-               WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
-               raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
-       }
-       my_rdp->nocb_gp_seq = -1;
-       WARN_ON(signal_pending(current));
-}
-
-/*
- * No-CBs grace-period-wait kthread.  There is one of these per group
- * of CPUs, but only once at least one CPU in that group has come online
- * at least once since boot.  This kthread checks for newly posted
- * callbacks from any of the CPUs it is responsible for, waits for a
- * grace period, then awakens all of the rcu_nocb_cb_kthread() instances
- * that then have callback-invocation work to do.
- */
-static int rcu_nocb_gp_kthread(void *arg)
-{
-       struct rcu_data *rdp = arg;
-
-       for (;;) {
-               WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1);
-               nocb_gp_wait(rdp);
-               cond_resched_tasks_rcu_qs();
-       }
-       return 0;
-}
-
-static inline bool nocb_cb_can_run(struct rcu_data *rdp)
-{
-       u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
-       return rcu_segcblist_test_flags(&rdp->cblist, flags);
-}
-
-static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
-{
-       return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
-}
-
-/*
- * Invoke any ready callbacks from the corresponding no-CBs CPU,
- * then, if there are no more, wait for more to appear.
- */
-static void nocb_cb_wait(struct rcu_data *rdp)
-{
-       struct rcu_segcblist *cblist = &rdp->cblist;
-       unsigned long cur_gp_seq;
-       unsigned long flags;
-       bool needwake_state = false;
-       bool needwake_gp = false;
-       bool can_sleep = true;
-       struct rcu_node *rnp = rdp->mynode;
-
-       local_irq_save(flags);
-       rcu_momentary_dyntick_idle();
-       local_irq_restore(flags);
-       /*
-        * Disable BH to provide the expected environment.  Also, when
-        * transitioning to/from NOCB mode, a self-requeuing callback might
-        * be invoked from softirq.  A short grace period could cause both
-        * instances of this callback would execute concurrently.
-        */
-       local_bh_disable();
-       rcu_do_batch(rdp);
-       local_bh_enable();
-       lockdep_assert_irqs_enabled();
-       rcu_nocb_lock_irqsave(rdp, flags);
-       if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
-           rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
-           raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
-               needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
-               raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
-       }
-
-       if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
-               if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) {
-                       rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_CB);
-                       if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
-                               needwake_state = true;
-               }
-               if (rcu_segcblist_ready_cbs(cblist))
-                       can_sleep = false;
-       } else {
-               /*
-                * De-offloading. Clear our flag and notify the de-offload worker.
-                * We won't touch the callbacks and keep sleeping until we ever
-                * get re-offloaded.
-                */
-               WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
-               rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
-               if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
-                       needwake_state = true;
-       }
-
-       WRITE_ONCE(rdp->nocb_cb_sleep, can_sleep);
-
-       if (rdp->nocb_cb_sleep)
-               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
-
-       rcu_nocb_unlock_irqrestore(rdp, flags);
-       if (needwake_gp)
-               rcu_gp_kthread_wake();
-
-       if (needwake_state)
-               swake_up_one(&rdp->nocb_state_wq);
-
-       do {
-               swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
-                                                   nocb_cb_wait_cond(rdp));
-
-               // VVV Ensure CB invocation follows _sleep test.
-               if (smp_load_acquire(&rdp->nocb_cb_sleep)) { // ^^^
-                       WARN_ON(signal_pending(current));
-                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
-               }
-       } while (!nocb_cb_can_run(rdp));
-}
-
-/*
- * Per-rcu_data kthread, but only for no-CBs CPUs.  Repeatedly invoke
- * nocb_cb_wait() to do the dirty work.
- */
-static int rcu_nocb_cb_kthread(void *arg)
-{
-       struct rcu_data *rdp = arg;
-
-       // Each pass through this loop does one callback batch, and,
-       // if there are no more ready callbacks, waits for them.
-       for (;;) {
-               nocb_cb_wait(rdp);
-               cond_resched_tasks_rcu_qs();
-       }
-       return 0;
-}
-
-/* Is a deferred wakeup of rcu_nocb_kthread() required? */
-static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
-{
-       return READ_ONCE(rdp->nocb_defer_wakeup) >= level;
-}
-
-/* Do a deferred wakeup of rcu_nocb_kthread(). */
-static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp,
-                                          struct rcu_data *rdp, int level,
-                                          unsigned long flags)
-       __releases(rdp_gp->nocb_gp_lock)
-{
-       int ndw;
-       int ret;
-
-       if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) {
-               raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-               return false;
-       }
-
-       ndw = rdp_gp->nocb_defer_wakeup;
-       ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
-       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
-
-       return ret;
-}
-
-/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
-static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
-{
-       unsigned long flags;
-       struct rcu_data *rdp = from_timer(rdp, t, nocb_timer);
-
-       WARN_ON_ONCE(rdp->nocb_gp_rdp != rdp);
-       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
-
-       raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags);
-       smp_mb__after_spinlock(); /* Timer expire before wakeup. */
-       do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags);
-}
-
-/*
- * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
- * This means we do an inexact common-case check.  Note that if
- * we miss, ->nocb_timer will eventually clean things up.
- */
-static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
-{
-       unsigned long flags;
-       struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-
-       if (!rdp_gp || !rcu_nocb_need_deferred_wakeup(rdp_gp, RCU_NOCB_WAKE))
-               return false;
-
-       raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-       return do_nocb_deferred_wakeup_common(rdp_gp, rdp, RCU_NOCB_WAKE, flags);
-}
-
-void rcu_nocb_flush_deferred_wakeup(void)
-{
-       do_nocb_deferred_wakeup(this_cpu_ptr(&rcu_data));
-}
-EXPORT_SYMBOL_GPL(rcu_nocb_flush_deferred_wakeup);
-
-static int rdp_offload_toggle(struct rcu_data *rdp,
-                              bool offload, unsigned long flags)
-       __releases(rdp->nocb_lock)
-{
-       struct rcu_segcblist *cblist = &rdp->cblist;
-       struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
-       bool wake_gp = false;
-
-       rcu_segcblist_offload(cblist, offload);
-
-       if (rdp->nocb_cb_sleep)
-               rdp->nocb_cb_sleep = false;
-       rcu_nocb_unlock_irqrestore(rdp, flags);
-
-       /*
-        * Ignore former value of nocb_cb_sleep and force wake up as it could
-        * have been spuriously set to false already.
-        */
-       swake_up_one(&rdp->nocb_cb_wq);
-
-       raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-       if (rdp_gp->nocb_gp_sleep) {
-               rdp_gp->nocb_gp_sleep = false;
-               wake_gp = true;
-       }
-       raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-
-       if (wake_gp)
-               wake_up_process(rdp_gp->nocb_gp_kthread);
-
-       return 0;
-}
-
-static long rcu_nocb_rdp_deoffload(void *arg)
-{
-       struct rcu_data *rdp = arg;
-       struct rcu_segcblist *cblist = &rdp->cblist;
-       unsigned long flags;
-       int ret;
-
-       WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
-
-       pr_info("De-offloading %d\n", rdp->cpu);
-
-       rcu_nocb_lock_irqsave(rdp, flags);
-       /*
-        * Flush once and for all now. This suffices because we are
-        * running on the target CPU holding ->nocb_lock (thus having
-        * interrupts disabled), and because rdp_offload_toggle()
-        * invokes rcu_segcblist_offload(), which clears SEGCBLIST_OFFLOADED.
-        * Thus future calls to rcu_segcblist_completely_offloaded() will
-        * return false, which means that future calls to rcu_nocb_try_bypass()
-        * will refuse to put anything into the bypass.
-        */
-       WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
-       ret = rdp_offload_toggle(rdp, false, flags);
-       swait_event_exclusive(rdp->nocb_state_wq,
-                             !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
-                                                       SEGCBLIST_KTHREAD_GP));
-       /*
-        * Lock one last time to acquire latest callback updates from kthreads
-        * so we can later handle callbacks locally without locking.
-        */
-       rcu_nocb_lock_irqsave(rdp, flags);
-       /*
-        * Theoretically we could set SEGCBLIST_SOFTIRQ_ONLY after the nocb
-        * lock is released but how about being paranoid for once?
-        */
-       rcu_segcblist_set_flags(cblist, SEGCBLIST_SOFTIRQ_ONLY);
-       /*
-        * With SEGCBLIST_SOFTIRQ_ONLY, we can't use
-        * rcu_nocb_unlock_irqrestore() anymore.
-        */
-       raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
-
-       /* Sanity check */
-       WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
-
-
-       return ret;
-}
-
-int rcu_nocb_cpu_deoffload(int cpu)
-{
-       struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-       int ret = 0;
-
-       mutex_lock(&rcu_state.barrier_mutex);
-       cpus_read_lock();
-       if (rcu_rdp_is_offloaded(rdp)) {
-               if (cpu_online(cpu)) {
-                       ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
-                       if (!ret)
-                               cpumask_clear_cpu(cpu, rcu_nocb_mask);
-               } else {
-                       pr_info("NOCB: Can't CB-deoffload an offline CPU\n");
-                       ret = -EINVAL;
-               }
-       }
-       cpus_read_unlock();
-       mutex_unlock(&rcu_state.barrier_mutex);
-
-       return ret;
-}
-EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
-
-static long rcu_nocb_rdp_offload(void *arg)
-{
-       struct rcu_data *rdp = arg;
-       struct rcu_segcblist *cblist = &rdp->cblist;
-       unsigned long flags;
-       int ret;
-
-       WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
-       /*
-        * For now we only support re-offload, ie: the rdp must have been
-        * offloaded on boot first.
-        */
-       if (!rdp->nocb_gp_rdp)
-               return -EINVAL;
-
-       pr_info("Offloading %d\n", rdp->cpu);
-       /*
-        * Can't use rcu_nocb_lock_irqsave() while we are in
-        * SEGCBLIST_SOFTIRQ_ONLY mode.
-        */
-       raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
-
-       /*
-        * We didn't take the nocb lock while working on the
-        * rdp->cblist in SEGCBLIST_SOFTIRQ_ONLY mode.
-        * Every modifications that have been done previously on
-        * rdp->cblist must be visible remotely by the nocb kthreads
-        * upon wake up after reading the cblist flags.
-        *
-        * The layout against nocb_lock enforces that ordering:
-        *
-        *  __rcu_nocb_rdp_offload()   nocb_cb_wait()/nocb_gp_wait()
-        * -------------------------   ----------------------------
-        *      WRITE callbacks           rcu_nocb_lock()
-        *      rcu_nocb_lock()           READ flags
-        *      WRITE flags               READ callbacks
-        *      rcu_nocb_unlock()         rcu_nocb_unlock()
-        */
-       ret = rdp_offload_toggle(rdp, true, flags);
-       swait_event_exclusive(rdp->nocb_state_wq,
-                             rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB) &&
-                             rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
-
-       return ret;
-}
-
-int rcu_nocb_cpu_offload(int cpu)
-{
-       struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-       int ret = 0;
-
-       mutex_lock(&rcu_state.barrier_mutex);
-       cpus_read_lock();
-       if (!rcu_rdp_is_offloaded(rdp)) {
-               if (cpu_online(cpu)) {
-                       ret = work_on_cpu(cpu, rcu_nocb_rdp_offload, rdp);
-                       if (!ret)
-                               cpumask_set_cpu(cpu, rcu_nocb_mask);
-               } else {
-                       pr_info("NOCB: Can't CB-offload an offline CPU\n");
-                       ret = -EINVAL;
-               }
-       }
-       cpus_read_unlock();
-       mutex_unlock(&rcu_state.barrier_mutex);
-
-       return ret;
-}
-EXPORT_SYMBOL_GPL(rcu_nocb_cpu_offload);
-
-void __init rcu_init_nohz(void)
-{
-       int cpu;
-       bool need_rcu_nocb_mask = false;
-       struct rcu_data *rdp;
-
-#if defined(CONFIG_NO_HZ_FULL)
-       if (tick_nohz_full_running && cpumask_weight(tick_nohz_full_mask))
-               need_rcu_nocb_mask = true;
-#endif /* #if defined(CONFIG_NO_HZ_FULL) */
-
-       if (!cpumask_available(rcu_nocb_mask) && need_rcu_nocb_mask) {
-               if (!zalloc_cpumask_var(&rcu_nocb_mask, GFP_KERNEL)) {
-                       pr_info("rcu_nocb_mask allocation failed, callback offloading disabled.\n");
-                       return;
-               }
-       }
-       if (!cpumask_available(rcu_nocb_mask))
-               return;
-
-#if defined(CONFIG_NO_HZ_FULL)
-       if (tick_nohz_full_running)
-               cpumask_or(rcu_nocb_mask, rcu_nocb_mask, tick_nohz_full_mask);
-#endif /* #if defined(CONFIG_NO_HZ_FULL) */
-
-       if (!cpumask_subset(rcu_nocb_mask, cpu_possible_mask)) {
-               pr_info("\tNote: kernel parameter 'rcu_nocbs=', 'nohz_full', or 'isolcpus=' contains nonexistent CPUs.\n");
-               cpumask_and(rcu_nocb_mask, cpu_possible_mask,
-                           rcu_nocb_mask);
-       }
-       if (cpumask_empty(rcu_nocb_mask))
-               pr_info("\tOffload RCU callbacks from CPUs: (none).\n");
-       else
-               pr_info("\tOffload RCU callbacks from CPUs: %*pbl.\n",
-                       cpumask_pr_args(rcu_nocb_mask));
-       if (rcu_nocb_poll)
-               pr_info("\tPoll for callbacks from no-CBs CPUs.\n");
-
-       for_each_cpu(cpu, rcu_nocb_mask) {
-               rdp = per_cpu_ptr(&rcu_data, cpu);
-               if (rcu_segcblist_empty(&rdp->cblist))
-                       rcu_segcblist_init(&rdp->cblist);
-               rcu_segcblist_offload(&rdp->cblist, true);
-               rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB);
-               rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
-       }
-       rcu_organize_nocb_kthreads();
-}
-
-/* Initialize per-rcu_data variables for no-CBs CPUs. */
-static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
-{
-       init_swait_queue_head(&rdp->nocb_cb_wq);
-       init_swait_queue_head(&rdp->nocb_gp_wq);
-       init_swait_queue_head(&rdp->nocb_state_wq);
-       raw_spin_lock_init(&rdp->nocb_lock);
-       raw_spin_lock_init(&rdp->nocb_bypass_lock);
-       raw_spin_lock_init(&rdp->nocb_gp_lock);
-       timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
-       rcu_cblist_init(&rdp->nocb_bypass);
-}
-
-/*
- * If the specified CPU is a no-CBs CPU that does not already have its
- * rcuo CB kthread, spawn it.  Additionally, if the rcuo GP kthread
- * for this CPU's group has not yet been created, spawn it as well.
- */
-static void rcu_spawn_one_nocb_kthread(int cpu)
-{
-       struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-       struct rcu_data *rdp_gp;
-       struct task_struct *t;
-
-       /*
-        * If this isn't a no-CBs CPU or if it already has an rcuo kthread,
-        * then nothing to do.
-        */
-       if (!rcu_is_nocb_cpu(cpu) || rdp->nocb_cb_kthread)
-               return;
-
-       /* If we didn't spawn the GP kthread first, reorganize! */
-       rdp_gp = rdp->nocb_gp_rdp;
-       if (!rdp_gp->nocb_gp_kthread) {
-               t = kthread_run(rcu_nocb_gp_kthread, rdp_gp,
-                               "rcuog/%d", rdp_gp->cpu);
-               if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo GP kthread, OOM is now expected behavior\n", __func__))
-                       return;
-               WRITE_ONCE(rdp_gp->nocb_gp_kthread, t);
-       }
-
-       /* Spawn the kthread for this CPU. */
-       t = kthread_run(rcu_nocb_cb_kthread, rdp,
-                       "rcuo%c/%d", rcu_state.abbr, cpu);
-       if (WARN_ONCE(IS_ERR(t), "%s: Could not start rcuo CB kthread, OOM is now expected behavior\n", __func__))
-               return;
-       WRITE_ONCE(rdp->nocb_cb_kthread, t);
-       WRITE_ONCE(rdp->nocb_gp_kthread, rdp_gp->nocb_gp_kthread);
-}
-
-/*
- * If the specified CPU is a no-CBs CPU that does not already have its
- * rcuo kthread, spawn it.
- */
-static void rcu_spawn_cpu_nocb_kthread(int cpu)
-{
-       if (rcu_scheduler_fully_active)
-               rcu_spawn_one_nocb_kthread(cpu);
-}
-
-/*
- * Once the scheduler is running, spawn rcuo kthreads for all online
- * no-CBs CPUs.  This assumes that the early_initcall()s happen before
- * non-boot CPUs come online -- if this changes, we will need to add
- * some mutual exclusion.
- */
-static void __init rcu_spawn_nocb_kthreads(void)
-{
-       int cpu;
-
-       for_each_online_cpu(cpu)
-               rcu_spawn_cpu_nocb_kthread(cpu);
-}
-
-/* How many CB CPU IDs per GP kthread?  Default of -1 for sqrt(nr_cpu_ids). */
-static int rcu_nocb_gp_stride = -1;
-module_param(rcu_nocb_gp_stride, int, 0444);
-
-/*
- * Initialize GP-CB relationships for all no-CBs CPU.
- */
-static void __init rcu_organize_nocb_kthreads(void)
-{
-       int cpu;
-       bool firsttime = true;
-       bool gotnocbs = false;
-       bool gotnocbscbs = true;
-       int ls = rcu_nocb_gp_stride;
-       int nl = 0;  /* Next GP kthread. */
-       struct rcu_data *rdp;
-       struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */
-       struct rcu_data *rdp_prev = NULL;
-
-       if (!cpumask_available(rcu_nocb_mask))
-               return;
-       if (ls == -1) {
-               ls = nr_cpu_ids / int_sqrt(nr_cpu_ids);
-               rcu_nocb_gp_stride = ls;
-       }
-
-       /*
-        * Each pass through this loop sets up one rcu_data structure.
-        * Should the corresponding CPU come online in the future, then
-        * we will spawn the needed set of rcu_nocb_kthread() kthreads.
-        */
-       for_each_cpu(cpu, rcu_nocb_mask) {
-               rdp = per_cpu_ptr(&rcu_data, cpu);
-               if (rdp->cpu >= nl) {
-                       /* New GP kthread, set up for CBs & next GP. */
-                       gotnocbs = true;
-                       nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
-                       rdp->nocb_gp_rdp = rdp;
-                       rdp_gp = rdp;
-                       if (dump_tree) {
-                               if (!firsttime)
-                                       pr_cont("%s\n", gotnocbscbs
-                                                       ? "" : " (self only)");
-                               gotnocbscbs = false;
-                               firsttime = false;
-                               pr_alert("%s: No-CB GP kthread CPU %d:",
-                                        __func__, cpu);
-                       }
-               } else {
-                       /* Another CB kthread, link to previous GP kthread. */
-                       gotnocbscbs = true;
-                       rdp->nocb_gp_rdp = rdp_gp;
-                       rdp_prev->nocb_next_cb_rdp = rdp;
-                       if (dump_tree)
-                               pr_cont(" %d", cpu);
-               }
-               rdp_prev = rdp;
-       }
-       if (gotnocbs && dump_tree)
-               pr_cont("%s\n", gotnocbscbs ? "" : " (self only)");
-}
-
-/*
- * Bind the current task to the offloaded CPUs.  If there are no offloaded
- * CPUs, leave the task unbound.  Splat if the bind attempt fails.
- */
-void rcu_bind_current_to_nocb(void)
-{
-       if (cpumask_available(rcu_nocb_mask) && cpumask_weight(rcu_nocb_mask))
-               WARN_ON(sched_setaffinity(current->pid, rcu_nocb_mask));
-}
-EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
-
-// The ->on_cpu field is available only in CONFIG_SMP=y, so...
-#ifdef CONFIG_SMP
-static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
-{
-       return tsp && task_is_running(tsp) && !tsp->on_cpu ? "!" : "";
-}
-#else // #ifdef CONFIG_SMP
-static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
-{
-       return "";
-}
-#endif // #else #ifdef CONFIG_SMP
-
-/*
- * Dump out nocb grace-period kthread state for the specified rcu_data
- * structure.
- */
-static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
-{
-       struct rcu_node *rnp = rdp->mynode;
-
-       pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
-               rdp->cpu,
-               "kK"[!!rdp->nocb_gp_kthread],
-               "lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
-               "dD"[!!rdp->nocb_defer_wakeup],
-               "tT"[timer_pending(&rdp->nocb_timer)],
-               "sS"[!!rdp->nocb_gp_sleep],
-               ".W"[swait_active(&rdp->nocb_gp_wq)],
-               ".W"[swait_active(&rnp->nocb_gp_wq[0])],
-               ".W"[swait_active(&rnp->nocb_gp_wq[1])],
-               ".B"[!!rdp->nocb_gp_bypass],
-               ".G"[!!rdp->nocb_gp_gp],
-               (long)rdp->nocb_gp_seq,
-               rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops),
-               rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.',
-               rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
-               show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
-}
-
-/* Dump out nocb kthread state for the specified rcu_data structure. */
-static void show_rcu_nocb_state(struct rcu_data *rdp)
-{
-       char bufw[20];
-       char bufr[20];
-       struct rcu_segcblist *rsclp = &rdp->cblist;
-       bool waslocked;
-       bool wassleep;
-
-       if (rdp->nocb_gp_rdp == rdp)
-               show_rcu_nocb_gp_state(rdp);
-
-       sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]);
-       sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]);
-       pr_info("   CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n",
-               rdp->cpu, rdp->nocb_gp_rdp->cpu,
-               rdp->nocb_next_cb_rdp ? rdp->nocb_next_cb_rdp->cpu : -1,
-               "kK"[!!rdp->nocb_cb_kthread],
-               "bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
-               "cC"[!!atomic_read(&rdp->nocb_lock_contended)],
-               "lL"[raw_spin_is_locked(&rdp->nocb_lock)],
-               "sS"[!!rdp->nocb_cb_sleep],
-               ".W"[swait_active(&rdp->nocb_cb_wq)],
-               jiffies - rdp->nocb_bypass_first,
-               jiffies - rdp->nocb_nobypass_last,
-               rdp->nocb_nobypass_count,
-               ".D"[rcu_segcblist_ready_cbs(rsclp)],
-               ".W"[!rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL)],
-               rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL) ? "" : bufw,
-               ".R"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL)],
-               rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL) ? "" : bufr,
-               ".N"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL)],
-               ".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)],
-               rcu_segcblist_n_cbs(&rdp->cblist),
-               rdp->nocb_cb_kthread ? task_state_to_char(rdp->nocb_cb_kthread) : '.',
-               rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
-               show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
-
-       /* It is OK for GP kthreads to have GP state. */
-       if (rdp->nocb_gp_rdp == rdp)
-               return;
-
-       waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock);
-       wassleep = swait_active(&rdp->nocb_gp_wq);
-       if (!rdp->nocb_gp_sleep && !waslocked && !wassleep)
-               return;  /* Nothing untoward. */
-
-       pr_info("   nocb GP activity on CB-only CPU!!! %c%c%c %c\n",
-               "lL"[waslocked],
-               "dD"[!!rdp->nocb_defer_wakeup],
-               "sS"[!!rdp->nocb_gp_sleep],
-               ".W"[wassleep]);
-}
-
-#else /* #ifdef CONFIG_RCU_NOCB_CPU */
-
-/* No ->nocb_lock to acquire.  */
-static void rcu_nocb_lock(struct rcu_data *rdp)
-{
-}
-
-/* No ->nocb_lock to release.  */
-static void rcu_nocb_unlock(struct rcu_data *rdp)
-{
-}
-
-/* No ->nocb_lock to release.  */
-static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp,
-                                      unsigned long flags)
-{
-       local_irq_restore(flags);
-}
-
-/* Lockdep check that ->cblist may be safely accessed. */
-static void rcu_lockdep_assert_cblist_protected(struct rcu_data *rdp)
-{
-       lockdep_assert_irqs_disabled();
-}
-
-static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq)
-{
-}
-
-static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp)
-{
-       return NULL;
-}
-
-static void rcu_init_one_nocb(struct rcu_node *rnp)
-{
-}
-
-static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                                 unsigned long j)
-{
-       return true;
-}
-
-static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
-                               bool *was_alldone, unsigned long flags)
-{
-       return false;
-}
-
-static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
-                                unsigned long flags)
-{
-       WARN_ON_ONCE(1);  /* Should be dead code! */
-}
-
-static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
-{
-}
-
-static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
-{
-       return false;
-}
-
-static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
-{
-       return false;
-}
-
-static void rcu_spawn_cpu_nocb_kthread(int cpu)
-{
-}
-
-static void __init rcu_spawn_nocb_kthreads(void)
-{
-}
-
-static void show_rcu_nocb_state(struct rcu_data *rdp)
-{
-}
-
-#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
-
 /*
  * Is this CPU a NO_HZ_FULL CPU that should ignore RCU so that the
  * grace-period kthread will do force_quiescent_state() processing?
@@ -2982,17 +1498,17 @@ static void noinstr rcu_dynticks_task_exit(void)
 /* Turn on heavyweight RCU tasks trace readers on idle/user entry. */
 static void rcu_dynticks_task_trace_enter(void)
 {
-#ifdef CONFIG_TASKS_RCU_TRACE
+#ifdef CONFIG_TASKS_TRACE_RCU
        if (IS_ENABLED(CONFIG_TASKS_TRACE_RCU_READ_MB))
                current->trc_reader_special.b.need_mb = true;
-#endif /* #ifdef CONFIG_TASKS_RCU_TRACE */
+#endif /* #ifdef CONFIG_TASKS_TRACE_RCU */
 }
 
 /* Turn off heavyweight RCU tasks trace readers on idle/user exit. */
 static void rcu_dynticks_task_trace_exit(void)
 {
-#ifdef CONFIG_TASKS_RCU_TRACE
+#ifdef CONFIG_TASKS_TRACE_RCU
        if (IS_ENABLED(CONFIG_TASKS_TRACE_RCU_READ_MB))
                current->trc_reader_special.b.need_mb = false;
-#endif /* #ifdef CONFIG_TASKS_RCU_TRACE */
+#endif /* #ifdef CONFIG_TASKS_TRACE_RCU */
 }
index 6c76988..677ee3d 100644 (file)
@@ -7,6 +7,8 @@
  * Author: Paul E. McKenney <paulmck@linux.ibm.com>
  */
 
+#include <linux/kvm_para.h>
+
 //////////////////////////////////////////////////////////////////////////////
 //
 // Controlling CPU stall warnings, including delay calculation.
@@ -117,17 +119,14 @@ static void panic_on_rcu_stall(void)
 }
 
 /**
- * rcu_cpu_stall_reset - prevent further stall warnings in current grace period
- *
- * Set the stall-warning timeout way off into the future, thus preventing
- * any RCU CPU stall-warning messages from appearing in the current set of
- * RCU grace periods.
+ * rcu_cpu_stall_reset - restart stall-warning timeout for current grace period
  *
  * The caller must disable hard irqs.
  */
 void rcu_cpu_stall_reset(void)
 {
-       WRITE_ONCE(rcu_state.jiffies_stall, jiffies + ULONG_MAX / 2);
+       WRITE_ONCE(rcu_state.jiffies_stall,
+                  jiffies + rcu_jiffies_till_stall_check());
 }
 
 //////////////////////////////////////////////////////////////////////////////
@@ -267,8 +266,10 @@ static int rcu_print_task_stall(struct rcu_node *rnp, unsigned long flags)
        struct task_struct *ts[8];
 
        lockdep_assert_irqs_disabled();
-       if (!rcu_preempt_blocked_readers_cgp(rnp))
+       if (!rcu_preempt_blocked_readers_cgp(rnp)) {
+               raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
                return 0;
+       }
        pr_err("\tTasks blocked on level-%d rcu_node (CPUs %d-%d):",
               rnp->level, rnp->grplo, rnp->grphi);
        t = list_entry(rnp->gp_tasks->prev,
@@ -280,8 +281,8 @@ static int rcu_print_task_stall(struct rcu_node *rnp, unsigned long flags)
                        break;
        }
        raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
-       for (i--; i; i--) {
-               t = ts[i];
+       while (i) {
+               t = ts[--i];
                if (!try_invoke_on_locked_down_task(t, check_slow_task, &rscr))
                        pr_cont(" P%d", t->pid);
                else
@@ -350,7 +351,7 @@ static void rcu_dump_cpu_stacks(void)
 
 static void print_cpu_stall_fast_no_hz(char *cp, int cpu)
 {
-       struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+       struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 
        sprintf(cp, "last_accelerate: %04lx/%04lx dyntick_enabled: %d",
                rdp->last_accelerate & 0xffff, jiffies & 0xffff,
@@ -464,9 +465,10 @@ static void rcu_check_gp_kthread_starvation(void)
                pr_err("%s kthread starved for %ld jiffies! g%ld f%#x %s(%d) ->state=%#x ->cpu=%d\n",
                       rcu_state.name, j,
                       (long)rcu_seq_current(&rcu_state.gp_seq),
-                      data_race(rcu_state.gp_flags),
-                      gp_state_getname(rcu_state.gp_state), rcu_state.gp_state,
-                      gpk ? gpk->__state : ~0, cpu);
+                      data_race(READ_ONCE(rcu_state.gp_flags)),
+                      gp_state_getname(rcu_state.gp_state),
+                      data_race(READ_ONCE(rcu_state.gp_state)),
+                      gpk ? data_race(READ_ONCE(gpk->__state)) : ~0, cpu);
                if (gpk) {
                        pr_err("\tUnless %s kthread gets sufficient CPU time, OOM is now expected behavior.\n", rcu_state.name);
                        pr_err("RCU grace-period kthread stack dump:\n");
@@ -509,7 +511,7 @@ static void rcu_check_gp_kthread_expired_fqs_timer(void)
                       (long)rcu_seq_current(&rcu_state.gp_seq),
                       data_race(rcu_state.gp_flags),
                       gp_state_getname(RCU_GP_WAIT_FQS), RCU_GP_WAIT_FQS,
-                      gpk->__state);
+                      data_race(READ_ONCE(gpk->__state)));
                pr_err("\tPossible timer handling issue on cpu=%d timer-softirq=%u\n",
                       cpu, kstat_softirqs_cpu(TIMER_SOFTIRQ, cpu));
        }
@@ -568,11 +570,11 @@ static void print_other_cpu_stall(unsigned long gp_seq, unsigned long gps)
                        pr_err("INFO: Stall ended before state dump start\n");
                } else {
                        j = jiffies;
-                       gpa = data_race(rcu_state.gp_activity);
+                       gpa = data_race(READ_ONCE(rcu_state.gp_activity));
                        pr_err("All QSes seen, last %s kthread activity %ld (%ld-%ld), jiffies_till_next_fqs=%ld, root ->qsmask %#lx\n",
                               rcu_state.name, j - gpa, j, gpa,
-                              data_race(jiffies_till_next_fqs),
-                              rcu_get_root()->qsmask);
+                              data_race(READ_ONCE(jiffies_till_next_fqs)),
+                              data_race(READ_ONCE(rcu_get_root()->qsmask)));
                }
        }
        /* Rewrite if needed in case of slow consoles. */
@@ -646,6 +648,7 @@ static void print_cpu_stall(unsigned long gps)
 
 static void check_cpu_stall(struct rcu_data *rdp)
 {
+       bool didstall = false;
        unsigned long gs1;
        unsigned long gs2;
        unsigned long gps;
@@ -691,24 +694,46 @@ static void check_cpu_stall(struct rcu_data *rdp)
            ULONG_CMP_GE(gps, js))
                return; /* No stall or GP completed since entering function. */
        rnp = rdp->mynode;
-       jn = jiffies + 3 * rcu_jiffies_till_stall_check() + 3;
+       jn = jiffies + ULONG_MAX / 2;
        if (rcu_gp_in_progress() &&
            (READ_ONCE(rnp->qsmask) & rdp->grpmask) &&
            cmpxchg(&rcu_state.jiffies_stall, js, jn) == js) {
 
+               /*
+                * If a virtual machine is stopped by the host it can look to
+                * the watchdog like an RCU stall. Check to see if the host
+                * stopped the vm.
+                */
+               if (kvm_check_and_clear_guest_paused())
+                       return;
+
                /* We haven't checked in, so go dump stack. */
                print_cpu_stall(gps);
                if (READ_ONCE(rcu_cpu_stall_ftrace_dump))
                        rcu_ftrace_dump(DUMP_ALL);
+               didstall = true;
 
        } else if (rcu_gp_in_progress() &&
                   ULONG_CMP_GE(j, js + RCU_STALL_RAT_DELAY) &&
                   cmpxchg(&rcu_state.jiffies_stall, js, jn) == js) {
 
+               /*
+                * If a virtual machine is stopped by the host it can look to
+                * the watchdog like an RCU stall. Check to see if the host
+                * stopped the vm.
+                */
+               if (kvm_check_and_clear_guest_paused())
+                       return;
+
                /* They had a few time units to dump stack, so complain. */
                print_other_cpu_stall(gs2, gps);
                if (READ_ONCE(rcu_cpu_stall_ftrace_dump))
                        rcu_ftrace_dump(DUMP_ALL);
+               didstall = true;
+       }
+       if (didstall && READ_ONCE(rcu_state.jiffies_stall) == jn) {
+               jn = jiffies + 3 * rcu_jiffies_till_stall_check() + 3;
+               WRITE_ONCE(rcu_state.jiffies_stall, jn);
        }
 }
 
@@ -742,7 +767,7 @@ bool rcu_check_boost_fail(unsigned long gp_state, int *cpup)
 
        rcu_for_each_leaf_node(rnp) {
                if (!cpup) {
-                       if (READ_ONCE(rnp->qsmask)) {
+                       if (data_race(READ_ONCE(rnp->qsmask))) {
                                return false;
                        } else {
                                if (READ_ONCE(rnp->gp_tasks))
@@ -791,32 +816,34 @@ void show_rcu_gp_kthreads(void)
        struct task_struct *t = READ_ONCE(rcu_state.gp_kthread);
 
        j = jiffies;
-       ja = j - data_race(rcu_state.gp_activity);
-       jr = j - data_race(rcu_state.gp_req_activity);
-       js = j - data_race(rcu_state.gp_start);
-       jw = j - data_race(rcu_state.gp_wake_time);
+       ja = j - data_race(READ_ONCE(rcu_state.gp_activity));
+       jr = j - data_race(READ_ONCE(rcu_state.gp_req_activity));
+       js = j - data_race(READ_ONCE(rcu_state.gp_start));
+       jw = j - data_race(READ_ONCE(rcu_state.gp_wake_time));
        pr_info("%s: wait state: %s(%d) ->state: %#x ->rt_priority %u delta ->gp_start %lu ->gp_activity %lu ->gp_req_activity %lu ->gp_wake_time %lu ->gp_wake_seq %ld ->gp_seq %ld ->gp_seq_needed %ld ->gp_max %lu ->gp_flags %#x\n",
                rcu_state.name, gp_state_getname(rcu_state.gp_state),
-               rcu_state.gp_state, t ? t->__state : 0x1ffff, t ? t->rt_priority : 0xffU,
-               js, ja, jr, jw, (long)data_race(rcu_state.gp_wake_seq),
-               (long)data_race(rcu_state.gp_seq),
-               (long)data_race(rcu_get_root()->gp_seq_needed),
-               data_race(rcu_state.gp_max),
-               data_race(rcu_state.gp_flags));
+               data_race(READ_ONCE(rcu_state.gp_state)),
+               t ? data_race(READ_ONCE(t->__state)) : 0x1ffff, t ? t->rt_priority : 0xffU,
+               js, ja, jr, jw, (long)data_race(READ_ONCE(rcu_state.gp_wake_seq)),
+               (long)data_race(READ_ONCE(rcu_state.gp_seq)),
+               (long)data_race(READ_ONCE(rcu_get_root()->gp_seq_needed)),
+               data_race(READ_ONCE(rcu_state.gp_max)),
+               data_race(READ_ONCE(rcu_state.gp_flags)));
        rcu_for_each_node_breadth_first(rnp) {
                if (ULONG_CMP_GE(READ_ONCE(rcu_state.gp_seq), READ_ONCE(rnp->gp_seq_needed)) &&
-                   !data_race(rnp->qsmask) && !data_race(rnp->boost_tasks) &&
-                   !data_race(rnp->exp_tasks) && !data_race(rnp->gp_tasks))
+                   !data_race(READ_ONCE(rnp->qsmask)) && !data_race(READ_ONCE(rnp->boost_tasks)) &&
+                   !data_race(READ_ONCE(rnp->exp_tasks)) && !data_race(READ_ONCE(rnp->gp_tasks)))
                        continue;
                pr_info("\trcu_node %d:%d ->gp_seq %ld ->gp_seq_needed %ld ->qsmask %#lx %c%c%c%c ->n_boosts %ld\n",
                        rnp->grplo, rnp->grphi,
-                       (long)data_race(rnp->gp_seq), (long)data_race(rnp->gp_seq_needed),
-                       data_race(rnp->qsmask),
-                       ".b"[!!data_race(rnp->boost_kthread_task)],
-                       ".B"[!!data_race(rnp->boost_tasks)],
-                       ".E"[!!data_race(rnp->exp_tasks)],
-                       ".G"[!!data_race(rnp->gp_tasks)],
-                       data_race(rnp->n_boosts));
+                       (long)data_race(READ_ONCE(rnp->gp_seq)),
+                       (long)data_race(READ_ONCE(rnp->gp_seq_needed)),
+                       data_race(READ_ONCE(rnp->qsmask)),
+                       ".b"[!!data_race(READ_ONCE(rnp->boost_kthread_task))],
+                       ".B"[!!data_race(READ_ONCE(rnp->boost_tasks))],
+                       ".E"[!!data_race(READ_ONCE(rnp->exp_tasks))],
+                       ".G"[!!data_race(READ_ONCE(rnp->gp_tasks))],
+                       data_race(READ_ONCE(rnp->n_boosts)));
                if (!rcu_is_leaf_node(rnp))
                        continue;
                for_each_leaf_node_possible_cpu(rnp, cpu) {
@@ -826,12 +853,12 @@ void show_rcu_gp_kthreads(void)
                                         READ_ONCE(rdp->gp_seq_needed)))
                                continue;
                        pr_info("\tcpu %d ->gp_seq_needed %ld\n",
-                               cpu, (long)data_race(rdp->gp_seq_needed));
+                               cpu, (long)data_race(READ_ONCE(rdp->gp_seq_needed)));
                }
        }
        for_each_possible_cpu(cpu) {
                rdp = per_cpu_ptr(&rcu_data, cpu);
-               cbs += data_race(rdp->n_cbs_invoked);
+               cbs += data_race(READ_ONCE(rdp->n_cbs_invoked));
                if (rcu_segcblist_is_offloaded(&rdp->cblist))
                        show_rcu_nocb_state(rdp);
        }
@@ -913,11 +940,11 @@ void rcu_fwd_progress_check(unsigned long j)
 
        if (rcu_gp_in_progress()) {
                pr_info("%s: GP age %lu jiffies\n",
-                       __func__, jiffies - rcu_state.gp_start);
+                       __func__, jiffies - data_race(READ_ONCE(rcu_state.gp_start)));
                show_rcu_gp_kthreads();
        } else {
                pr_info("%s: Last GP end %lu jiffies ago\n",
-                       __func__, jiffies - rcu_state.gp_end);
+                       __func__, jiffies - data_race(READ_ONCE(rcu_state.gp_end)));
                preempt_disable();
                rdp = this_cpu_ptr(&rcu_data);
                rcu_check_gp_start_stall(rdp->mynode, rdp, j);
index 29e8fc5..64a0828 100644 (file)
@@ -64,6 +64,7 @@ torture_param(bool, use_cpus_read_lock, 0, "Use cpus_read_lock() to exclude CPU
 torture_param(int, verbose, 0, "Enable verbose debugging printk()s");
 torture_param(int, weight_resched, -1, "Testing weight for resched_cpu() operations.");
 torture_param(int, weight_single, -1, "Testing weight for single-CPU no-wait operations.");
+torture_param(int, weight_single_rpc, -1, "Testing weight for single-CPU RPC operations.");
 torture_param(int, weight_single_wait, -1, "Testing weight for single-CPU operations.");
 torture_param(int, weight_many, -1, "Testing weight for multi-CPU no-wait operations.");
 torture_param(int, weight_many_wait, -1, "Testing weight for multi-CPU operations.");
@@ -86,6 +87,8 @@ struct scf_statistics {
        long long n_resched;
        long long n_single;
        long long n_single_ofl;
+       long long n_single_rpc;
+       long long n_single_rpc_ofl;
        long long n_single_wait;
        long long n_single_wait_ofl;
        long long n_many;
@@ -101,14 +104,17 @@ static DEFINE_PER_CPU(long long, scf_invoked_count);
 // Data for random primitive selection
 #define SCF_PRIM_RESCHED       0
 #define SCF_PRIM_SINGLE                1
-#define SCF_PRIM_MANY          2
-#define SCF_PRIM_ALL           3
-#define SCF_NPRIMS             7 // Need wait and no-wait versions of each,
-                                 //  except for SCF_PRIM_RESCHED.
+#define SCF_PRIM_SINGLE_RPC    2
+#define SCF_PRIM_MANY          3
+#define SCF_PRIM_ALL           4
+#define SCF_NPRIMS             8 // Need wait and no-wait versions of each,
+                                 //  except for SCF_PRIM_RESCHED and
+                                 //  SCF_PRIM_SINGLE_RPC.
 
 static char *scf_prim_name[] = {
        "resched_cpu",
        "smp_call_function_single",
+       "smp_call_function_single_rpc",
        "smp_call_function_many",
        "smp_call_function",
 };
@@ -128,6 +134,8 @@ struct scf_check {
        bool scfc_out;
        int scfc_cpu; // -1 for not _single().
        bool scfc_wait;
+       bool scfc_rpc;
+       struct completion scfc_completion;
 };
 
 // Use to wait for all threads to start.
@@ -158,6 +166,7 @@ static void scf_torture_stats_print(void)
                scfs.n_resched += scf_stats_p[i].n_resched;
                scfs.n_single += scf_stats_p[i].n_single;
                scfs.n_single_ofl += scf_stats_p[i].n_single_ofl;
+               scfs.n_single_rpc += scf_stats_p[i].n_single_rpc;
                scfs.n_single_wait += scf_stats_p[i].n_single_wait;
                scfs.n_single_wait_ofl += scf_stats_p[i].n_single_wait_ofl;
                scfs.n_many += scf_stats_p[i].n_many;
@@ -168,9 +177,10 @@ static void scf_torture_stats_print(void)
        if (atomic_read(&n_errs) || atomic_read(&n_mb_in_errs) ||
            atomic_read(&n_mb_out_errs) || atomic_read(&n_alloc_errs))
                bangstr = "!!! ";
-       pr_alert("%s %sscf_invoked_count %s: %lld resched: %lld single: %lld/%lld single_ofl: %lld/%lld many: %lld/%lld all: %lld/%lld ",
+       pr_alert("%s %sscf_invoked_count %s: %lld resched: %lld single: %lld/%lld single_ofl: %lld/%lld single_rpc: %lld single_rpc_ofl: %lld many: %lld/%lld all: %lld/%lld ",
                 SCFTORT_FLAG, bangstr, isdone ? "VER" : "ver", invoked_count, scfs.n_resched,
                 scfs.n_single, scfs.n_single_wait, scfs.n_single_ofl, scfs.n_single_wait_ofl,
+                scfs.n_single_rpc, scfs.n_single_rpc_ofl,
                 scfs.n_many, scfs.n_many_wait, scfs.n_all, scfs.n_all_wait);
        torture_onoff_stats();
        pr_cont("ste: %d stnmie: %d stnmoe: %d staf: %d\n", atomic_read(&n_errs),
@@ -282,10 +292,13 @@ static void scf_handler(void *scfc_in)
 out:
        if (unlikely(!scfcp))
                return;
-       if (scfcp->scfc_wait)
+       if (scfcp->scfc_wait) {
                WRITE_ONCE(scfcp->scfc_out, true);
-       else
+               if (scfcp->scfc_rpc)
+                       complete(&scfcp->scfc_completion);
+       } else {
                kfree(scfcp);
+       }
 }
 
 // As above, but check for correct CPU.
@@ -319,6 +332,7 @@ static void scftorture_invoke_one(struct scf_statistics *scfp, struct torture_ra
                        scfcp->scfc_cpu = -1;
                        scfcp->scfc_wait = scfsp->scfs_wait;
                        scfcp->scfc_out = false;
+                       scfcp->scfc_rpc = false;
                }
        }
        switch (scfsp->scfs_prim) {
@@ -350,6 +364,34 @@ static void scftorture_invoke_one(struct scf_statistics *scfp, struct torture_ra
                        scfcp = NULL;
                }
                break;
+       case SCF_PRIM_SINGLE_RPC:
+               if (!scfcp)
+                       break;
+               cpu = torture_random(trsp) % nr_cpu_ids;
+               scfp->n_single_rpc++;
+               scfcp->scfc_cpu = cpu;
+               scfcp->scfc_wait = true;
+               init_completion(&scfcp->scfc_completion);
+               scfcp->scfc_rpc = true;
+               barrier(); // Prevent race-reduction compiler optimizations.
+               scfcp->scfc_in = true;
+               ret = smp_call_function_single(cpu, scf_handler_1, (void *)scfcp, 0);
+               if (!ret) {
+                       if (use_cpus_read_lock)
+                               cpus_read_unlock();
+                       else
+                               preempt_enable();
+                       wait_for_completion(&scfcp->scfc_completion);
+                       if (use_cpus_read_lock)
+                               cpus_read_lock();
+                       else
+                               preempt_disable();
+               } else {
+                       scfp->n_single_rpc_ofl++;
+                       kfree(scfcp);
+                       scfcp = NULL;
+               }
+               break;
        case SCF_PRIM_MANY:
                if (scfsp->scfs_wait)
                        scfp->n_many_wait++;
@@ -379,10 +421,12 @@ static void scftorture_invoke_one(struct scf_statistics *scfp, struct torture_ra
        }
        if (scfcp && scfsp->scfs_wait) {
                if (WARN_ON_ONCE((num_online_cpus() > 1 || scfsp->scfs_prim == SCF_PRIM_SINGLE) &&
-                                !scfcp->scfc_out))
+                                !scfcp->scfc_out)) {
+                       pr_warn("%s: Memory-ordering failure, scfs_prim: %d.\n", __func__, scfsp->scfs_prim);
                        atomic_inc(&n_mb_out_errs); // Leak rather than trash!
-               else
+               } else {
                        kfree(scfcp);
+               }
                barrier(); // Prevent race-reduction compiler optimizations.
        }
        if (use_cpus_read_lock)
@@ -453,8 +497,8 @@ static void
 scftorture_print_module_parms(const char *tag)
 {
        pr_alert(SCFTORT_FLAG
-                "--- %s:  verbose=%d holdoff=%d longwait=%d nthreads=%d onoff_holdoff=%d onoff_interval=%d shutdown_secs=%d stat_interval=%d stutter=%d use_cpus_read_lock=%d, weight_resched=%d, weight_single=%d, weight_single_wait=%d, weight_many=%d, weight_many_wait=%d, weight_all=%d, weight_all_wait=%d\n", tag,
-                verbose, holdoff, longwait, nthreads, onoff_holdoff, onoff_interval, shutdown, stat_interval, stutter, use_cpus_read_lock, weight_resched, weight_single, weight_single_wait, weight_many, weight_many_wait, weight_all, weight_all_wait);
+                "--- %s:  verbose=%d holdoff=%d longwait=%d nthreads=%d onoff_holdoff=%d onoff_interval=%d shutdown_secs=%d stat_interval=%d stutter=%d use_cpus_read_lock=%d, weight_resched=%d, weight_single=%d, weight_single_rpc=%d, weight_single_wait=%d, weight_many=%d, weight_many_wait=%d, weight_all=%d, weight_all_wait=%d\n", tag,
+                verbose, holdoff, longwait, nthreads, onoff_holdoff, onoff_interval, shutdown, stat_interval, stutter, use_cpus_read_lock, weight_resched, weight_single, weight_single_rpc, weight_single_wait, weight_many, weight_many_wait, weight_all, weight_all_wait);
 }
 
 static void scf_cleanup_handler(void *unused)
@@ -469,7 +513,7 @@ static void scf_torture_cleanup(void)
                return;
 
        WRITE_ONCE(scfdone, true);
-       if (nthreads)
+       if (nthreads && scf_stats_p)
                for (i = 0; i < nthreads; i++)
                        torture_stop_kthread("scftorture_invoker", scf_stats_p[i].task);
        else
@@ -497,6 +541,7 @@ static int __init scf_torture_init(void)
        int firsterr = 0;
        unsigned long weight_resched1 = weight_resched;
        unsigned long weight_single1 = weight_single;
+       unsigned long weight_single_rpc1 = weight_single_rpc;
        unsigned long weight_single_wait1 = weight_single_wait;
        unsigned long weight_many1 = weight_many;
        unsigned long weight_many_wait1 = weight_many_wait;
@@ -508,11 +553,13 @@ static int __init scf_torture_init(void)
 
        scftorture_print_module_parms("Start of test");
 
-       if (weight_resched == -1 && weight_single == -1 && weight_single_wait == -1 &&
+       if (weight_resched == -1 &&
+           weight_single == -1 && weight_single_rpc == -1 && weight_single_wait == -1 &&
            weight_many == -1 && weight_many_wait == -1 &&
            weight_all == -1 && weight_all_wait == -1) {
                weight_resched1 = 2 * nr_cpu_ids;
                weight_single1 = 2 * nr_cpu_ids;
+               weight_single_rpc1 = 2 * nr_cpu_ids;
                weight_single_wait1 = 2 * nr_cpu_ids;
                weight_many1 = 2;
                weight_many_wait1 = 2;
@@ -523,6 +570,8 @@ static int __init scf_torture_init(void)
                        weight_resched1 = 0;
                if (weight_single == -1)
                        weight_single1 = 0;
+               if (weight_single_rpc == -1)
+                       weight_single_rpc1 = 0;
                if (weight_single_wait == -1)
                        weight_single_wait1 = 0;
                if (weight_many == -1)
@@ -534,7 +583,7 @@ static int __init scf_torture_init(void)
                if (weight_all_wait == -1)
                        weight_all_wait1 = 0;
        }
-       if (weight_single1 == 0 && weight_single_wait1 == 0 &&
+       if (weight_single1 == 0 && weight_single_rpc1 == 0 && weight_single_wait1 == 0 &&
            weight_many1 == 0 && weight_many_wait1 == 0 &&
            weight_all1 == 0 && weight_all_wait1 == 0) {
                VERBOSE_SCFTORTOUT_ERRSTRING("all zero weights makes no sense");
@@ -546,6 +595,7 @@ static int __init scf_torture_init(void)
        else if (weight_resched1)
                VERBOSE_SCFTORTOUT_ERRSTRING("built as module, weight_resched ignored");
        scf_sel_add(weight_single1, SCF_PRIM_SINGLE, false);
+       scf_sel_add(weight_single_rpc1, SCF_PRIM_SINGLE_RPC, true);
        scf_sel_add(weight_single_wait1, SCF_PRIM_SINGLE, true);
        scf_sel_add(weight_many1, SCF_PRIM_MANY, false);
        scf_sel_add(weight_many_wait1, SCF_PRIM_MANY, true);
index f3b27c6..2b9ed11 100644 (file)
@@ -7849,6 +7849,17 @@ int __sched __cond_resched(void)
                preempt_schedule_common();
                return 1;
        }
+       /*
+        * In preemptible kernels, ->rcu_read_lock_nesting tells the tick
+        * whether the current CPU is in an RCU read-side critical section,
+        * so the tick can report quiescent states even for CPUs looping
+        * in kernel context.  In contrast, in non-preemptible kernels,
+        * RCU readers leave no in-memory hints, which means that CPU-bound
+        * processes executing in kernel context might never report an
+        * RCU quiescent state.  Therefore, the following code causes
+        * cond_resched() to report a quiescent state, but only when RCU
+        * is in urgent need of one.
+        */
 #ifndef CONFIG_PREEMPT_RCU
        rcu_all_qs();
 #endif
index 0a315c3..bb8f411 100644 (file)
@@ -521,11 +521,11 @@ static void torture_shuffle_tasks(void)
        struct shuffle_task *stp;
 
        cpumask_setall(shuffle_tmp_mask);
-       get_online_cpus();
+       cpus_read_lock();
 
        /* No point in shuffling if there is only one online CPU (ex: UP) */
        if (num_online_cpus() == 1) {
-               put_online_cpus();
+               cpus_read_unlock();
                return;
        }
 
@@ -541,7 +541,7 @@ static void torture_shuffle_tasks(void)
                set_cpus_allowed_ptr(stp->st_t, shuffle_tmp_mask);
        mutex_unlock(&shuffle_task_mutex);
 
-       put_online_cpus();
+       cpus_read_unlock();
 }
 
 /* Shuffle tasks across CPUs, with the intent of allowing each CPU in the
index 8b7a983..3430667 100644 (file)
@@ -1031,7 +1031,7 @@ struct sys_stat_struct {
  *     scall32-o32.S in the kernel sources.
  *   - the system call is performed by calling "syscall"
  *   - syscall return comes in v0, and register a3 needs to be checked to know
- *     if an error occured, in which case errno is in v0.
+ *     if an error occurred, in which case errno is in v0.
  *   - the arguments are cast to long and assigned into the target registers
  *     which are then simply passed as registers to the asm code, so that we
  *     don't have to experience issues with register constraints.
@@ -2243,6 +2243,19 @@ unsigned int sleep(unsigned int seconds)
                return 0;
 }
 
+static __attribute__((unused))
+int msleep(unsigned int msecs)
+{
+       struct timeval my_timeval = { msecs / 1000, (msecs % 1000) * 1000 };
+
+       if (sys_select(0, 0, 0, 0, &my_timeval) < 0)
+               return (my_timeval.tv_sec * 1000) +
+                       (my_timeval.tv_usec / 1000) +
+                       !!(my_timeval.tv_usec % 1000);
+       else
+               return 0;
+}
+
 static __attribute__((unused))
 int stat(const char *path, struct stat *buf)
 {
index 15d937b..fd1ffaa 100755 (executable)
@@ -68,16 +68,12 @@ do
        cpumask=`awk -v cpus="$cpus" -v me=$me -v n=$n 'BEGIN {
                srand(n + me + systime());
                ncpus = split(cpus, ca);
-               curcpu = ca[int(rand() * ncpus + 1)];
-               z = "";
-               for (i = 1; 4 * i <= curcpu; i++)
-                       z = z "0";
-               print "0x" 2 ^ (curcpu % 4) z;
+               print ca[int(rand() * ncpus + 1)];
        }' < /dev/null`
        n=$(($n+1))
-       if ! taskset -p $cpumask $$ > /dev/null 2>&1
+       if ! taskset -c -p $cpumask $$ > /dev/null 2>&1
        then
-               echo taskset failure: '"taskset -p ' $cpumask $$ '"'
+               echo taskset failure: '"taskset -c -p ' $cpumask $$ '"'
                exit 1
        fi
 
index e5cc6b2..1af5d6b 100755 (executable)
@@ -14,7 +14,7 @@ if test -z "$TORTURE_KCONFIG_KCSAN_ARG"
 then
        exit 0
 fi
-cat $1/*/console.log |
+find $1 -name console.log -exec cat {} \; |
        grep "BUG: KCSAN: " |
        sed -e 's/^\[[^]]*] //' |
        sort |
index d8c8483..5a0023d 100755 (executable)
@@ -142,7 +142,7 @@ then
        echo "Cannot copy from $oldrun to $rundir."
        usage
 fi
-rm -f "$rundir"/*/{console.log,console.log.diags,qemu_pid,qemu-retval,Warnings,kvm-test-1-run.sh.out,kvm-test-1-run-qemu.sh.out,vmlinux} "$rundir"/log
+rm -f "$rundir"/*/{console.log,console.log.diags,qemu_pid,qemu-pid,qemu-retval,Warnings,kvm-test-1-run.sh.out,kvm-test-1-run-qemu.sh.out,vmlinux} "$rundir"/log
 touch "$rundir/log"
 echo $scriptname $args | tee -a "$rundir/log"
 echo $oldrun > "$rundir/re-run"
@@ -179,6 +179,6 @@ if test -n "$dryrun"
 then
        echo ---- Dryrun complete, directory: $rundir | tee -a "$rundir/log"
 else
-       ( cd "$rundir"; sh $T/runbatches.sh )
+       ( cd "$rundir"; sh $T/runbatches.sh ) | tee -a "$rundir/log"
        kvm-end-run-stats.sh "$rundir" "$starttime"
 fi
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-assign-cpus.sh b/tools/testing/selftests/rcutorture/bin/kvm-assign-cpus.sh
new file mode 100755 (executable)
index 0000000..f99b2c1
--- /dev/null
@@ -0,0 +1,106 @@
+#!/bin/sh
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Produce awk statements roughly depicting the system's CPU and cache
+# layout.  If the required information is not available, produce
+# error messages as awk comments.  Successful exit regardless.
+#
+# Usage: kvm-assign-cpus.sh /path/to/sysfs
+
+T=/tmp/kvm-assign-cpus.sh.$$
+trap 'rm -rf $T' 0 2
+mkdir $T
+
+sysfsdir=${1-/sys/devices/system/node}
+if ! cd "$sysfsdir" > $T/msg 2>&1
+then
+       sed -e 's/^/# /' < $T/msg
+       exit 0
+fi
+nodelist="`ls -d node*`"
+for i in node*
+do
+       if ! test -d $i/
+       then
+               echo "# Not a directory: $sysfsdir/node*"
+               exit 0
+       fi
+       for j in $i/cpu*/cache/index*
+       do
+               if ! test -d $j/
+               then
+                       echo "# Not a directory: $sysfsdir/$j"
+                       exit 0
+               else
+                       break
+               fi
+       done
+       indexlist="`ls -d $i/cpu* | grep 'cpu[0-9][0-9]*' | head -1 | sed -e 's,^.*$,ls -d &/cache/index*,' | sh | sed -e 's,^.*/,,'`"
+       break
+done
+for i in node*/cpu*/cache/index*/shared_cpu_list
+do
+       if ! test -f $i
+       then
+               echo "# Not a file: $sysfsdir/$i"
+               exit 0
+       else
+               break
+       fi
+done
+firstshared=
+for i in $indexlist
+do
+       rm -f $T/cpulist
+       for n in node*
+       do
+               f="$n/cpu*/cache/$i/shared_cpu_list"
+               if ! cat $f > $T/msg 2>&1
+               then
+                       sed -e 's/^/# /' < $T/msg
+                       exit 0
+               fi
+               cat $f >> $T/cpulist
+       done
+       if grep -q '[-,]' $T/cpulist
+       then
+               if test -z "$firstshared"
+               then
+                       firstshared="$i"
+               fi
+       fi
+done
+if test -z "$firstshared"
+then
+       splitindex="`echo $indexlist | sed -e 's/ .*$//'`"
+else
+       splitindex="$firstshared"
+fi
+nodenum=0
+for n in node*
+do
+       cat $n/cpu*/cache/$splitindex/shared_cpu_list | sort -u -k1n |
+       awk -v nodenum="$nodenum" '
+       BEGIN {
+               idx = 0;
+       }
+
+       {
+               nlists = split($0, cpulists, ",");
+               for (i = 1; i <= nlists; i++) {
+                       listsize = split(cpulists[i], cpus, "-");
+                       if (listsize == 1)
+                               cpus[2] = cpus[1];
+                       for (j = cpus[1]; j <= cpus[2]; j++) {
+                               print "cpu[" nodenum "][" idx "] = " j ";";
+                               idx++;
+                       }
+               }
+       }
+
+       END {
+               print "nodecpus[" nodenum "] = " idx ";";
+       }'
+       nodenum=`expr $nodenum + 1`
+done
+echo "numnodes = $nodenum;"
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-get-cpus-script.sh b/tools/testing/selftests/rcutorture/bin/kvm-get-cpus-script.sh
new file mode 100755 (executable)
index 0000000..20c7c53
--- /dev/null
@@ -0,0 +1,88 @@
+#!/bin/sh
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Create an awk script that takes as input numbers of CPUs and outputs
+# lists of CPUs, one per line in both cases.
+#
+# Usage: kvm-get-cpus-script.sh /path/to/cpu/arrays /path/to/put/script [ /path/to/state ]
+#
+# The CPU arrays are output by kvm-assign-cpus.sh, and are valid awk
+# statements initializing the variables describing the system's topology.
+#
+# The optional state is input by this script (if the file exists and is
+# non-empty), and can also be output by this script.
+
+cpuarrays="${1-/sys/devices/system/node}"
+scriptfile="${2}"
+statefile="${3}"
+
+if ! test -f "$cpuarrays"
+then
+       echo "File not found: $cpuarrays" 1>&2
+       exit 1
+fi
+scriptdir="`dirname "$scriptfile"`"
+if ! test -d "$scriptdir" || ! test -x "$scriptdir" || ! test -w "$scriptdir"
+then
+       echo "Directory not usable for script output: $scriptdir"
+       exit 1
+fi
+
+cat << '___EOF___' > "$scriptfile"
+BEGIN {
+___EOF___
+cat "$cpuarrays" >> "$scriptfile"
+if test -r "$statefile"
+then
+       cat "$statefile" >> "$scriptfile"
+fi
+cat << '___EOF___' >> "$scriptfile"
+}
+
+# Do we have the system architecture to guide CPU affinity?
+function gotcpus()
+{
+       return numnodes != "";
+}
+
+# Return a comma-separated list of the next n CPUs.
+function nextcpus(n,  i, s)
+{
+       for (i = 0; i < n; i++) {
+               if (nodecpus[curnode] == "")
+                       curnode = 0;
+               if (cpu[curnode][curcpu[curnode]] == "")
+                       curcpu[curnode] = 0;
+               if (s != "")
+                       s = s ",";
+               s = s cpu[curnode][curcpu[curnode]];
+               curcpu[curnode]++;
+               curnode++
+       }
+       return s;
+}
+
+# Dump out the current node/CPU state so that a later invocation of this
+# script can continue where this one left off.  Of course, this only works
+# when a state file was specified and where there was valid sysfs state.
+# Returns 1 if the state was dumped, 0 otherwise.
+#
+# Dumping the state for one system configuration and loading it into
+# another isn't likely to do what you want, whatever that might be.
+function dumpcpustate(  i, fn)
+{
+___EOF___
+echo ' fn = "'"$statefile"'";' >> $scriptfile
+cat << '___EOF___' >> "$scriptfile"
+       if (fn != "" && gotcpus()) {
+               print "curnode = " curnode ";" > fn;
+               for (i = 0; i < numnodes; i++)
+                       if (curcpu[i] != "")
+                               print "curcpu[" i "] = " curcpu[i] ";" >> fn;
+               return 1;
+       }
+       if (fn != "")
+               print "# No CPU state to dump." > fn;
+       return 0;
+}
+___EOF___
index f3a7a5e..db2c0e2 100755 (executable)
@@ -25,7 +25,7 @@ then
        echo "$configfile -------"
 else
        title="$configfile ------- $ncs acquisitions/releases"
-       dur=`sed -e 's/^.* locktorture.shutdown_secs=//' -e 's/ .*$//' < $i/qemu-cmd 2> /dev/null`
+       dur=`grep -v '^#' $i/qemu-cmd | sed -e 's/^.* locktorture.shutdown_secs=//' -e 's/ .*$//' 2> /dev/null`
        if test -z "$dur"
        then
                :
index 671bfee..3afa5c6 100755 (executable)
@@ -25,7 +25,7 @@ if test -z "$nscfs"
 then
        echo "$configfile ------- "
 else
-       dur="`sed -e 's/^.* scftorture.shutdown_secs=//' -e 's/ .*$//' < $i/qemu-cmd 2> /dev/null`"
+       dur="`grep -v '^#' $i/qemu-cmd | sed -e 's/^.* scftorture.shutdown_secs=//' -e 's/ .*$//' 2> /dev/null`"
        if test -z "$dur"
        then
                rate=""
index e01b31b..0a54199 100755 (executable)
@@ -74,7 +74,10 @@ do
        done
        if test -f "$rd/kcsan.sum"
        then
-               if grep -q CONFIG_KCSAN=y $T
+               if ! test -f $T
+               then
+                       :
+               elif grep -q CONFIG_KCSAN=y $T
                then
                        echo "Compiler or architecture does not support KCSAN!"
                        echo Did you forget to switch your compiler with '--kmake-arg CC=<cc-that-supports-kcsan>'?
diff --git a/tools/testing/selftests/rcutorture/bin/kvm-remote-noreap.sh b/tools/testing/selftests/rcutorture/bin/kvm-remote-noreap.sh
new file mode 100755 (executable)
index 0000000..014ce68
--- /dev/null
@@ -0,0 +1,30 @@
+#!/bin/bash
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Periodically scan a directory tree to prevent files from being reaped
+# by systemd and friends on long runs.
+#
+# Usage: kvm-remote-noreap.sh pathname
+#
+# Copyright (C) 2021 Facebook, Inc.
+#
+# Authors: Paul E. McKenney <paulmck@kernel.org>
+
+pathname="$1"
+if test "$pathname" = ""
+then
+       echo Usage: kvm-remote-noreap.sh pathname
+       exit 1
+fi
+if ! test -d "$pathname"
+then
+       echo  Usage: kvm-remote-noreap.sh pathname
+       echo "       pathname must be a directory."
+       exit 2
+fi
+
+while test -d "$pathname"
+do
+       find "$pathname" -type f -exec touch -c {} \; > /dev/null 2>&1
+       sleep 30
+done
index 79e680e..03126eb 100755 (executable)
@@ -124,10 +124,12 @@ awk < "$rundir"/scenarios -v dest="$T/bin" -v rundir="$rundir" '
        n = $1;
        sub(/\./, "", n);
        fn = dest "/kvm-remote-" n ".sh"
+       print "kvm-remote-noreap.sh " rundir " &" > fn;
        scenarios = "";
        for (i = 2; i <= NF; i++)
                scenarios = scenarios " " $i;
-       print "kvm-test-1-run-batch.sh" scenarios > fn;
+       print "kvm-test-1-run-batch.sh" scenarios >> fn;
+       print "sync" >> fn;
        print "rm " rundir "/remote.run" >> fn;
 }'
 chmod +x $T/bin/kvm-remote-*.sh
@@ -172,11 +174,20 @@ checkremotefile () {
        do
                ssh $1 "test -f \"$2\""
                ret=$?
-               if test "$ret" -ne 255
+               if test "$ret" -eq 255
                then
+                       echo " ---" ssh failure to $1 checking for file $2, retry after $sleeptime seconds. `date`
+               elif test "$ret" -eq 0
+               then
+                       return 0
+               elif test "$ret" -eq 1
+               then
+                       echo " ---" File \"$2\" not found: ssh $1 test -f \"$2\"
+                       return 1
+               else
+                       echo " ---" Exit code $ret: ssh $1 test -f \"$2\", retry after $sleeptime seconds. `date`
                        return $ret
                fi
-               echo " ---" ssh failure to $1 checking for file $2, retry after $sleeptime seconds. `date`
                sleep $sleeptime
        done
 }
@@ -242,7 +253,8 @@ do
        do
                sleep 30
        done
-       ( cd "$oldrun"; ssh $i "cd $rundir; tar -czf - kvm-remote-*.sh.out */console.log */kvm-test-1-run*.sh.out */qemu_pid */qemu-retval; rm -rf $T > /dev/null 2>&1" | tar -xzf - )
+       echo " ---" Collecting results from $i `date`
+       ( cd "$oldrun"; ssh $i "cd $rundir; tar -czf - kvm-remote-*.sh.out */console.log */kvm-test-1-run*.sh.out */qemu[_-]pid */qemu-retval */qemu-affinity; rm -rf $T > /dev/null 2>&1" | tar -xzf - )
 done
 
 ( kvm-end-run-stats.sh "$oldrun" "$starttime"; echo $? > $T/exitcode ) | tee -a "$oldrun/remote-log"
index 7ea0809..1e29d65 100755 (executable)
@@ -50,10 +50,34 @@ grep '^#' $1/qemu-cmd | sed -e 's/^# //' > $T/qemu-cmd-settings
 echo ---- System running test: `uname -a`
 echo ---- Starting kernels. `date` | tee -a log
 $TORTURE_JITTER_START
+kvm-assign-cpus.sh /sys/devices/system/node > $T/cpuarray.awk
 for i in "$@"
 do
        echo ---- System running test: `uname -a` > $i/kvm-test-1-run-qemu.sh.out
        echo > $i/kvm-test-1-run-qemu.sh.out
+       export TORTURE_AFFINITY=
+       kvm-get-cpus-script.sh $T/cpuarray.awk $T/cpubatches.awk $T/cpustate
+       cat << '        ___EOF___' >> $T/cpubatches.awk
+       END {
+               affinitylist = "";
+               if (!gotcpus()) {
+                       print "echo No CPU-affinity information, so no taskset command.";
+               } else if (cpu_count !~ /^[0-9][0-9]*$/) {
+                       print "echo " scenario ": Bogus number of CPUs (old qemu-cmd?), so no taskset command.";
+               } else {
+                       affinitylist = nextcpus(cpu_count);
+                       if (!(affinitylist ~ /^[0-9,-][0-9,-]*$/))
+                               print "echo " scenario ": Bogus CPU-affinity information, so no taskset command.";
+                       else if (!dumpcpustate())
+                               print "echo " scenario ": Could not dump state, so no taskset command.";
+                       else
+                               print "export TORTURE_AFFINITY=" affinitylist;
+               }
+       }
+       ___EOF___
+       cpu_count="`grep '# TORTURE_CPU_COUNT=' $i/qemu-cmd | sed -e 's/^.*=//'`"
+       affinity_export="`awk -f $T/cpubatches.awk -v cpu_count="$cpu_count" -v scenario=$i < /dev/null`"
+       $affinity_export
        kvm-test-1-run-qemu.sh $i >> $i/kvm-test-1-run-qemu.sh.out 2>&1 &
 done
 for i in $runfiles
index 5b1aa2a..4428058 100755 (executable)
@@ -39,27 +39,34 @@ echo ' ---' `date`: Starting kernel, PID $$
 grep '^#' $resdir/qemu-cmd | sed -e 's/^# //' > $T/qemu-cmd-settings
 . $T/qemu-cmd-settings
 
-# Decorate qemu-cmd with redirection, backgrounding, and PID capture
-sed -e 's/$/ 2>\&1 \&/' < $resdir/qemu-cmd > $T/qemu-cmd
-echo 'echo $! > $resdir/qemu_pid' >> $T/qemu-cmd
+# Decorate qemu-cmd with affinity, redirection, backgrounding, and PID capture
+taskset_command=
+if test -n "$TORTURE_AFFINITY"
+then
+       taskset_command="taskset -c $TORTURE_AFFINITY "
+fi
+sed -e 's/^[^#].*$/'"$taskset_command"'& 2>\&1 \&/' < $resdir/qemu-cmd > $T/qemu-cmd
+echo 'qemu_pid=$!' >> $T/qemu-cmd
+echo 'echo $qemu_pid > $resdir/qemu-pid' >> $T/qemu-cmd
+echo 'taskset -c -p $qemu_pid > $resdir/qemu-affinity' >> $T/qemu-cmd
 
 # In case qemu refuses to run...
 echo "NOTE: $QEMU either did not run or was interactive" > $resdir/console.log
 
 # Attempt to run qemu
 kstarttime=`gawk 'BEGIN { print systime() }' < /dev/null`
-( . $T/qemu-cmd; wait `cat  $resdir/qemu_pid`; echo $? > $resdir/qemu-retval ) &
+( . $T/qemu-cmd; wait `cat  $resdir/qemu-pid`; echo $? > $resdir/qemu-retval ) &
 commandcompleted=0
 if test -z "$TORTURE_KCONFIG_GDB_ARG"
 then
        sleep 10 # Give qemu's pid a chance to reach the file
-       if test -s "$resdir/qemu_pid"
+       if test -s "$resdir/qemu-pid"
        then
-               qemu_pid=`cat "$resdir/qemu_pid"`
-               echo Monitoring qemu job at pid $qemu_pid
+               qemu_pid=`cat "$resdir/qemu-pid"`
+               echo Monitoring qemu job at pid $qemu_pid `date`
        else
                qemu_pid=""
-               echo Monitoring qemu job at yet-as-unknown pid
+               echo Monitoring qemu job at yet-as-unknown pid `date`
        fi
 fi
 if test -n "$TORTURE_KCONFIG_GDB_ARG"
@@ -82,9 +89,9 @@ then
 fi
 while :
 do
-       if test -z "$qemu_pid" -a -s "$resdir/qemu_pid"
+       if test -z "$qemu_pid" && test -s "$resdir/qemu-pid"
        then
-               qemu_pid=`cat "$resdir/qemu_pid"`
+               qemu_pid=`cat "$resdir/qemu-pid"`
        fi
        kruntime=`gawk 'BEGIN { print systime() - '"$kstarttime"' }' < /dev/null`
        if test -z "$qemu_pid" || kill -0 "$qemu_pid" > /dev/null 2>&1
@@ -115,22 +122,22 @@ do
                break
        fi
 done
-if test -z "$qemu_pid" -a -s "$resdir/qemu_pid"
+if test -z "$qemu_pid" && test -s "$resdir/qemu-pid"
 then
-       qemu_pid=`cat "$resdir/qemu_pid"`
+       qemu_pid=`cat "$resdir/qemu-pid"`
 fi
-if test $commandcompleted -eq 0 -a -n "$qemu_pid"
+if test $commandcompleted -eq 0 && test -n "$qemu_pid"
 then
        if ! test -f "$resdir/../STOP.1"
        then
-               echo Grace period for qemu job at pid $qemu_pid
+               echo Grace period for qemu job at pid $qemu_pid `date`
        fi
        oldline="`tail $resdir/console.log`"
        while :
        do
                if test -f "$resdir/../STOP.1"
                then
-                       echo "PID $qemu_pid killed due to run STOP.1 request" >> $resdir/Warnings 2>&1
+                       echo "PID $qemu_pid killed due to run STOP.1 request `date`" >> $resdir/Warnings 2>&1
                        kill -KILL $qemu_pid
                        break
                fi
@@ -152,13 +159,17 @@ then
                then
                        last_ts=0
                fi
-               if test "$newline" != "$oldline" -a "$last_ts" -lt $((seconds + $TORTURE_SHUTDOWN_GRACE))
+               if test "$newline" != "$oldline" && test "$last_ts" -lt $((seconds + $TORTURE_SHUTDOWN_GRACE)) && test "$last_ts" -gt "$TORTURE_SHUTDOWN_GRACE"
                then
                        must_continue=yes
+                       if test $kruntime -ge $((seconds + $TORTURE_SHUTDOWN_GRACE))
+                       then
+                               echo Continuing at console.log time $last_ts \"`tail -n 1 $resdir/console.log`\" `date`
+                       fi
                fi
-               if test $must_continue = no -a $kruntime -ge $((seconds + $TORTURE_SHUTDOWN_GRACE))
+               if test $must_continue = no && test $kruntime -ge $((seconds + $TORTURE_SHUTDOWN_GRACE))
                then
-                       echo "!!! PID $qemu_pid hung at $kruntime vs. $seconds seconds" >> $resdir/Warnings 2>&1
+                       echo "!!! PID $qemu_pid hung at $kruntime vs. $seconds seconds `date`" >> $resdir/Warnings 2>&1
                        kill -KILL $qemu_pid
                        break
                fi
@@ -172,5 +183,3 @@ fi
 
 # Tell the script that this run is done.
 rm -f $resdir/build.run
-
-parse-console.sh $resdir/console.log $title
index 420ed5c..f4c8055 100755 (executable)
@@ -205,6 +205,7 @@ echo "# TORTURE_KCONFIG_GDB_ARG=\"$TORTURE_KCONFIG_GDB_ARG\"" >> $resdir/qemu-cm
 echo "# TORTURE_JITTER_START=\"$TORTURE_JITTER_START\"" >> $resdir/qemu-cmd
 echo "# TORTURE_JITTER_STOP=\"$TORTURE_JITTER_STOP\"" >> $resdir/qemu-cmd
 echo "# TORTURE_TRUST_MAKE=\"$TORTURE_TRUST_MAKE\"; export TORTURE_TRUST_MAKE" >> $resdir/qemu-cmd
+echo "# TORTURE_CPU_COUNT=$cpu_count" >> $resdir/qemu-cmd
 
 if test -n "$TORTURE_BUILDONLY"
 then
@@ -214,3 +215,4 @@ then
 fi
 
 kvm-test-1-run-qemu.sh $resdir
+parse-console.sh $resdir/console.log $title
index b4ac4ee..f442d84 100755 (executable)
@@ -430,17 +430,10 @@ then
        git diff HEAD >> $resdir/$ds/testid.txt
 fi
 ___EOF___
-awk < $T/cfgcpu.pack \
-       -v TORTURE_BUILDONLY="$TORTURE_BUILDONLY" \
-       -v CONFIGDIR="$CONFIGFRAG/" \
-       -v KVM="$KVM" \
-       -v ncpus=$cpus \
-       -v jitter="$jitter" \
-       -v rd=$resdir/$ds/ \
-       -v dur=$dur \
-       -v TORTURE_QEMU_ARG="$TORTURE_QEMU_ARG" \
-       -v TORTURE_BOOTARGS="$TORTURE_BOOTARGS" \
-'BEGIN {
+kvm-assign-cpus.sh /sys/devices/system/node > $T/cpuarray.awk
+kvm-get-cpus-script.sh $T/cpuarray.awk $T/dumpbatches.awk
+cat << '___EOF___' >> $T/dumpbatches.awk
+BEGIN {
        i = 0;
 }
 
@@ -451,7 +444,7 @@ awk < $T/cfgcpu.pack \
 }
 
 # Dump out the scripting required to run one test batch.
-function dump(first, pastlast, batchnum)
+function dump(first, pastlast, batchnum,  affinitylist)
 {
        print "echo ----Start batch " batchnum ": `date` | tee -a " rd "log";
        print "needqemurun="
@@ -483,6 +476,14 @@ function dump(first, pastlast, batchnum)
                print "echo ", cfr[jn], cpusr[jn] ovf ": Starting build. `date` | tee -a " rd "log";
                print "mkdir " rd cfr[jn] " || :";
                print "touch " builddir ".wait";
+               affinitylist = "";
+               if (gotcpus()) {
+                       affinitylist = nextcpus(cpusr[jn]);
+               }
+               if (affinitylist ~ /^[0-9,-][0-9,-]*$/)
+                       print "export TORTURE_AFFINITY=" affinitylist;
+               else
+                       print "export TORTURE_AFFINITY=";
                print "kvm-test-1-run.sh " CONFIGDIR cf[j], rd cfr[jn], dur " \"" TORTURE_QEMU_ARG "\" \"" TORTURE_BOOTARGS "\" > " rd cfr[jn]  "/kvm-test-1-run.sh.out 2>&1 &"
                print "echo ", cfr[jn], cpusr[jn] ovf ": Waiting for build to complete. `date` | tee -a " rd "log";
                print "while test -f " builddir ".wait"
@@ -560,7 +561,19 @@ END {
        # Dump the last batch.
        if (ncpus != 0)
                dump(first, i, batchnum);
-}' >> $T/script
+}
+___EOF___
+awk < $T/cfgcpu.pack \
+       -v TORTURE_BUILDONLY="$TORTURE_BUILDONLY" \
+       -v CONFIGDIR="$CONFIGFRAG/" \
+       -v KVM="$KVM" \
+       -v ncpus=$cpus \
+       -v jitter="$jitter" \
+       -v rd=$resdir/$ds/ \
+       -v dur=$dur \
+       -v TORTURE_QEMU_ARG="$TORTURE_QEMU_ARG" \
+       -v TORTURE_BOOTARGS="$TORTURE_BOOTARGS" \
+       -f $T/dumpbatches.awk >> $T/script
 echo kvm-end-run-stats.sh "$resdir/$ds" "$starttime" >> $T/script
 
 # Extract the tests and their batches from the script.
index 53ec7c0..363f560 100755 (executable)
@@ -53,6 +53,7 @@ do_refscale=yes
 do_kvfree=yes
 do_kasan=yes
 do_kcsan=no
+do_clocksourcewd=yes
 
 # doyesno - Helper function for yes/no arguments
 function doyesno () {
@@ -72,6 +73,7 @@ usage () {
        echo "       --configs-scftorture \"config-file list w/ repeat factor (2*CFLIST)\""
        echo "       --doall"
        echo "       --doallmodconfig / --do-no-allmodconfig"
+       echo "       --do-clocksourcewd / --do-no-clocksourcewd"
        echo "       --do-kasan / --do-no-kasan"
        echo "       --do-kcsan / --do-no-kcsan"
        echo "       --do-kvfree / --do-no-kvfree"
@@ -109,7 +111,7 @@ do
                configs_scftorture="$configs_scftorture $2"
                shift
                ;;
-       --doall)
+       --do-all|--doall)
                do_allmodconfig=yes
                do_rcutorture=yes
                do_locktorture=yes
@@ -119,10 +121,14 @@ do
                do_kvfree=yes
                do_kasan=yes
                do_kcsan=yes
+               do_clocksourcewd=yes
                ;;
        --do-allmodconfig|--do-no-allmodconfig)
                do_allmodconfig=`doyesno "$1" --do-allmodconfig`
                ;;
+       --do-clocksourcewd|--do-no-clocksourcewd)
+               do_clocksourcewd=`doyesno "$1" --do-clocksourcewd`
+               ;;
        --do-kasan|--do-no-kasan)
                do_kasan=`doyesno "$1" --do-kasan`
                ;;
@@ -135,7 +141,7 @@ do
        --do-locktorture|--do-no-locktorture)
                do_locktorture=`doyesno "$1" --do-locktorture`
                ;;
-       --do-none)
+       --do-none|--donone)
                do_allmodconfig=no
                do_rcutorture=no
                do_locktorture=no
@@ -145,6 +151,7 @@ do
                do_kvfree=no
                do_kasan=no
                do_kcsan=no
+               do_clocksourcewd=no
                ;;
        --do-rcuscale|--do-no-rcuscale)
                do_rcuscale=`doyesno "$1" --do-rcuscale`
@@ -279,9 +286,9 @@ function torture_one {
 #      torture_bootargs="[ kernel boot arguments ]"
 #      torture_set flavor [ kvm.sh arguments ]
 #
-# Note that "flavor" is an arbitrary string.  Supply --torture if needed.
-# Note that quoting is problematic.  So on the command line, pass multiple
-# values with multiple kvm.sh argument instances.
+# Note that "flavor" is an arbitrary string that does not affect kvm.sh
+# in any way.  So also supply --torture if you need something other than
+# the default.
 function torture_set {
        local cur_kcsan_kmake_args=
        local kcsan_kmake_tag=
@@ -377,6 +384,22 @@ then
        torture_set "rcuscale-kvfree" tools/testing/selftests/rcutorture/bin/kvm.sh --torture rcuscale --allcpus --duration 10 --kconfig "CONFIG_NR_CPUS=$HALF_ALLOTED_CPUS" --memory 1G --trust-make
 fi
 
+if test "$do_clocksourcewd" = "yes"
+then
+       torture_bootargs="rcupdate.rcu_cpu_stall_suppress_at_boot=1 torture.disable_onoff_at_boot rcupdate.rcu_task_stall_timeout=30000"
+       torture_set "clocksourcewd-1" tools/testing/selftests/rcutorture/bin/kvm.sh --allcpus --duration 45s --configs TREE03 --kconfig "CONFIG_TEST_CLOCKSOURCE_WATCHDOG=y" --trust-make
+
+       torture_bootargs="rcupdate.rcu_cpu_stall_suppress_at_boot=1 torture.disable_onoff_at_boot rcupdate.rcu_task_stall_timeout=30000 clocksource.max_cswd_read_retries=1"
+       torture_set "clocksourcewd-2" tools/testing/selftests/rcutorture/bin/kvm.sh --allcpus --duration 45s --configs TREE03 --kconfig "CONFIG_TEST_CLOCKSOURCE_WATCHDOG=y" --trust-make
+
+       # In case our work is already done...
+       if test "$do_rcutorture" != "yes"
+       then
+               torture_bootargs="rcupdate.rcu_cpu_stall_suppress_at_boot=1 torture.disable_onoff_at_boot rcupdate.rcu_task_stall_timeout=30000"
+               torture_set "clocksourcewd-3" tools/testing/selftests/rcutorture/bin/kvm.sh --allcpus --duration 45s --configs TREE03 --trust-make
+       fi
+fi
+
 echo " --- " $scriptname $args
 echo " --- " Done `date` | tee -a $T/log
 ret=0
@@ -395,6 +418,10 @@ then
        nfailures="`wc -l "$T/failures" | awk '{ print $1 }'`"
        ret=2
 fi
+if test "$do_kcsan" = "yes"
+then
+       TORTURE_KCONFIG_KCSAN_ARG=1 tools/testing/selftests/rcutorture/bin/kcsan-collapse.sh tools/testing/selftests/rcutorture/res/$ds > tools/testing/selftests/rcutorture/res/$ds/kcsan.sum
+fi
 echo Started at $startdate, ended at `date`, duration `get_starttime_duration $starttime`. | tee -a $T/log
 echo Summary: Successes: $nsuccesses Failures: $nfailures. | tee -a $T/log
 tdir="`cat $T/successes $T/failures | head -1 | awk '{ print $NF }' | sed -e 's,/[^/]\+/*$,,'`"
index bafe94c..3ca1124 100644 (file)
@@ -1,5 +1,5 @@
 CONFIG_SMP=y
-CONFIG_NR_CPUS=2
+CONFIG_NR_CPUS=4
 CONFIG_HOTPLUG_CPU=y
 CONFIG_PREEMPT_NONE=n
 CONFIG_PREEMPT_VOLUNTARY=n
index bafe94c..3ca1124 100644 (file)
@@ -1,5 +1,5 @@
 CONFIG_SMP=y
-CONFIG_NR_CPUS=2
+CONFIG_NR_CPUS=4
 CONFIG_HOTPLUG_CPU=y
 CONFIG_PREEMPT_NONE=n
 CONFIG_PREEMPT_VOLUNTARY=n
index ea43990..dc02083 100644 (file)
@@ -1,5 +1,5 @@
 CONFIG_SMP=y
-CONFIG_NR_CPUS=2
+CONFIG_NR_CPUS=4
 CONFIG_PREEMPT_NONE=n
 CONFIG_PREEMPT_VOLUNTARY=n
 CONFIG_PREEMPT=y