Merge branches 'bitmaprange.2021.05.10c', 'doc.2021.05.10c', 'fixes.2021.05.13a'...
[linux-2.6-microblaze.git] / kernel / rcu / tree.c
index 8e78b24..28f1093 100644 (file)
@@ -186,6 +186,17 @@ module_param(rcu_unlock_delay, int, 0444);
 static int rcu_min_cached_objs = 5;
 module_param(rcu_min_cached_objs, int, 0444);
 
+// A page shrinker can ask for pages to be freed to make them
+// available for other parts of the system. This usually happens
+// under low memory conditions, and in that case we should also
+// defer page-cache filling for a short time period.
+//
+// The default value is 5 seconds, which is long enough to reduce
+// interference with the shrinker while it asks other systems to
+// drain their caches.
+static int rcu_delay_page_cache_fill_msec = 5000;
+module_param(rcu_delay_page_cache_fill_msec, int, 0444);
+
 /* Retrieve RCU kthreads priority for rcutorture */
 int rcu_get_gp_kthreads_prio(void)
 {
@@ -202,7 +213,7 @@ EXPORT_SYMBOL_GPL(rcu_get_gp_kthreads_prio);
  * the need for long delays to increase some race probabilities with the
  * need for fast grace periods to increase other race probabilities.
  */
-#define PER_RCU_NODE_PERIOD 3  /* Number of grace periods between delays. */
+#define PER_RCU_NODE_PERIOD 3  /* Number of grace periods between delays for debugging. */
 
 /*
  * Compute the mask of online CPUs for the specified rcu_node structure.
@@ -242,6 +253,7 @@ void rcu_softirq_qs(void)
 {
        rcu_qs();
        rcu_preempt_deferred_qs(current);
+       rcu_tasks_qs(current, false);
 }
 
 /*
@@ -833,28 +845,6 @@ void noinstr rcu_irq_exit(void)
        rcu_nmi_exit();
 }
 
-/**
- * rcu_irq_exit_preempt - Inform RCU that current CPU is exiting irq
- *                       towards in kernel preemption
- *
- * Same as rcu_irq_exit() but has a sanity check that scheduling is safe
- * from RCU point of view. Invoked from return from interrupt before kernel
- * preemption.
- */
-void rcu_irq_exit_preempt(void)
-{
-       lockdep_assert_irqs_disabled();
-       rcu_nmi_exit();
-
-       RCU_LOCKDEP_WARN(__this_cpu_read(rcu_data.dynticks_nesting) <= 0,
-                        "RCU dynticks_nesting counter underflow/zero!");
-       RCU_LOCKDEP_WARN(__this_cpu_read(rcu_data.dynticks_nmi_nesting) !=
-                        DYNTICK_IRQ_NONIDLE,
-                        "Bad RCU  dynticks_nmi_nesting counter\n");
-       RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(),
-                        "RCU in extended quiescent state!");
-}
-
 #ifdef CONFIG_PROVE_RCU
 /**
  * rcu_irq_exit_check_preempt - Validate that scheduling is possible
@@ -959,7 +949,7 @@ EXPORT_SYMBOL_GPL(rcu_idle_exit);
  */
 void noinstr rcu_user_exit(void)
 {
-       rcu_eqs_exit(1);
+       rcu_eqs_exit(true);
 }
 
 /**
@@ -1225,7 +1215,7 @@ EXPORT_SYMBOL_GPL(rcu_lockdep_current_cpu_online);
 #endif /* #if defined(CONFIG_PROVE_RCU) && defined(CONFIG_HOTPLUG_CPU) */
 
 /*
- * We are reporting a quiescent state on behalf of some other CPU, so
+ * When trying to report a quiescent state on behalf of some other CPU,
  * it is our responsibility to check for and handle potential overflow
  * of the rcu_node ->gp_seq counter with respect to the rcu_data counters.
  * After all, the CPU might be in deep idle state, and thus executing no
@@ -2048,7 +2038,7 @@ static void rcu_gp_fqs_loop(void)
 /*
  * Clean up after the old grace period.
  */
-static void rcu_gp_cleanup(void)
+static noinline void rcu_gp_cleanup(void)
 {
        int cpu;
        bool needgp = false;
@@ -2489,7 +2479,7 @@ int rcutree_dead_cpu(unsigned int cpu)
 
 /*
  * Invoke any RCU callbacks that have made it to the end of their grace
- * period.  Thottle as specified by rdp->blimit.
+ * period.  Throttle as specified by rdp->blimit.
  */
 static void rcu_do_batch(struct rcu_data *rdp)
 {
@@ -2629,7 +2619,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
  * state, for example, user mode or idle loop.  It also schedules RCU
  * core processing.  If the current grace period has gone on too long,
  * it will ask the scheduler to manufacture a context switch for the sole
- * purpose of providing a providing the needed quiescent state.
+ * purpose of providing the needed quiescent state.
  */
 void rcu_sched_clock_irq(int user)
 {
@@ -2911,7 +2901,6 @@ static int __init rcu_spawn_core_kthreads(void)
                  "%s: Could not start rcuc kthread, OOM is now expected behavior\n", __func__);
        return 0;
 }
-early_initcall(rcu_spawn_core_kthreads);
 
 /*
  * Handle any core-RCU processing required by a call_rcu() invocation.
@@ -3082,12 +3071,14 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func)
  * period elapses, in other words after all pre-existing RCU read-side
  * critical sections have completed.  However, the callback function
  * might well execute concurrently with RCU read-side critical sections
- * that started after call_rcu() was invoked.  RCU read-side critical
- * sections are delimited by rcu_read_lock() and rcu_read_unlock(), and
- * may be nested.  In addition, regions of code across which interrupts,
- * preemption, or softirqs have been disabled also serve as RCU read-side
- * critical sections.  This includes hardware interrupt handlers, softirq
- * handlers, and NMI handlers.
+ * that started after call_rcu() was invoked.
+ *
+ * RCU read-side critical sections are delimited by rcu_read_lock()
+ * and rcu_read_unlock(), and may be nested.  In addition, but only in
+ * v5.0 and later, regions of code across which interrupts, preemption,
+ * or softirqs have been disabled also serve as RCU read-side critical
+ * sections.  This includes hardware interrupt handlers, softirq handlers,
+ * and NMI handlers.
  *
  * Note that all CPUs must agree that the grace period extended beyond
  * all pre-existing RCU read-side critical section.  On systems with more
@@ -3107,6 +3098,9 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func)
  * between the call to call_rcu() and the invocation of "func()" -- even
  * if CPU A and CPU B are the same CPU (but again only if the system has
  * more than one CPU).
+ *
+ * Implementation of these memory-ordering guarantees is described here:
+ * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
  */
 void call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
@@ -3171,6 +3165,7 @@ struct kfree_rcu_cpu_work {
  *     Even though it is lockless an access has to be protected by the
  *     per-cpu lock.
  * @page_cache_work: A work to refill the cache when it is empty
+ * @backoff_page_cache_fill: Delay cache refills
  * @work_in_progress: Indicates that page_cache_work is running
  * @hrtimer: A hrtimer for scheduling a page_cache_work
  * @nr_bkv_objs: number of allocated objects at @bkvcache.
@@ -3190,7 +3185,8 @@ struct kfree_rcu_cpu {
        bool initialized;
        int count;
 
-       struct work_struct page_cache_work;
+       struct delayed_work page_cache_work;
+       atomic_t backoff_page_cache_fill;
        atomic_t work_in_progress;
        struct hrtimer hrtimer;
 
@@ -3237,7 +3233,7 @@ get_cached_bnode(struct kfree_rcu_cpu *krcp)
        if (!krcp->nr_bkv_objs)
                return NULL;
 
-       krcp->nr_bkv_objs--;
+       WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs - 1);
        return (struct kvfree_rcu_bulk_data *)
                llist_del_first(&krcp->bkvcache);
 }
@@ -3251,14 +3247,33 @@ put_cached_bnode(struct kfree_rcu_cpu *krcp,
                return false;
 
        llist_add((struct llist_node *) bnode, &krcp->bkvcache);
-       krcp->nr_bkv_objs++;
+       WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs + 1);
        return true;
+}
+
+static int
+drain_page_cache(struct kfree_rcu_cpu *krcp)
+{
+       unsigned long flags;
+       struct llist_node *page_list, *pos, *n;
+       int freed = 0;
 
+       raw_spin_lock_irqsave(&krcp->lock, flags);
+       page_list = llist_del_all(&krcp->bkvcache);
+       WRITE_ONCE(krcp->nr_bkv_objs, 0);
+       raw_spin_unlock_irqrestore(&krcp->lock, flags);
+
+       llist_for_each_safe(pos, n, page_list) {
+               free_page((unsigned long)pos);
+               freed++;
+       }
+
+       return freed;
 }
 
 /*
  * This function is invoked in workqueue context after a grace period.
- * It frees all the objects queued on ->bhead_free or ->head_free.
+ * It frees all the objects queued on ->bkvhead_free or ->head_free.
  */
 static void kfree_rcu_work(struct work_struct *work)
 {
@@ -3285,7 +3300,7 @@ static void kfree_rcu_work(struct work_struct *work)
        krwp->head_free = NULL;
        raw_spin_unlock_irqrestore(&krcp->lock, flags);
 
-       // Handle two first channels.
+       // Handle the first two channels.
        for (i = 0; i < FREE_N_CHANNELS; i++) {
                for (; bkvhead[i]; bkvhead[i] = bnext) {
                        bnext = bkvhead[i]->next;
@@ -3323,9 +3338,11 @@ static void kfree_rcu_work(struct work_struct *work)
        }
 
        /*
-        * Emergency case only. It can happen under low memory
-        * condition when an allocation gets failed, so the "bulk"
-        * path can not be temporary maintained.
+        * This is used when the "bulk" path can not be used for the
+        * double-argument of kvfree_rcu().  This happens when the
+        * page-cache is empty, which means that objects are instead
+        * queued on a linked list through their rcu_head structures.
+        * This list is named "Channel 3".
         */
        for (; head; head = next) {
                unsigned long offset = (unsigned long)head->func;
@@ -3345,34 +3362,31 @@ static void kfree_rcu_work(struct work_struct *work)
 }
 
 /*
- * Schedule the kfree batch RCU work to run in workqueue context after a GP.
- *
- * This function is invoked by kfree_rcu_monitor() when the KFREE_DRAIN_JIFFIES
- * timeout has been reached.
+ * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
  */
-static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
+static void kfree_rcu_monitor(struct work_struct *work)
 {
-       struct kfree_rcu_cpu_work *krwp;
-       bool repeat = false;
+       struct kfree_rcu_cpu *krcp = container_of(work,
+               struct kfree_rcu_cpu, monitor_work.work);
+       unsigned long flags;
        int i, j;
 
-       lockdep_assert_held(&krcp->lock);
+       raw_spin_lock_irqsave(&krcp->lock, flags);
 
+       // Attempt to start a new batch.
        for (i = 0; i < KFREE_N_BATCHES; i++) {
-               krwp = &(krcp->krw_arr[i]);
+               struct kfree_rcu_cpu_work *krwp = &(krcp->krw_arr[i]);
 
-               /*
-                * Try to detach bkvhead or head and attach it over any
-                * available corresponding free channel. It can be that
-                * a previous RCU batch is in progress, it means that
-                * immediately to queue another one is not possible so
-                * return false to tell caller to retry.
-                */
+               // Try to detach bkvhead or head and attach it over any
+               // available corresponding free channel. It can be that
+               // a previous RCU batch is in progress, it means that
+               // immediately to queue another one is not possible so
+               // in that case the monitor work is rearmed.
                if ((krcp->bkvhead[0] && !krwp->bkvhead_free[0]) ||
                        (krcp->bkvhead[1] && !krwp->bkvhead_free[1]) ||
                                (krcp->head && !krwp->head_free)) {
-                       // Channel 1 corresponds to SLAB ptrs.
-                       // Channel 2 corresponds to vmalloc ptrs.
+                       // Channel 1 corresponds to the SLAB-pointer bulk path.
+                       // Channel 2 corresponds to vmalloc-pointer bulk path.
                        for (j = 0; j < FREE_N_CHANNELS; j++) {
                                if (!krwp->bkvhead_free[j]) {
                                        krwp->bkvhead_free[j] = krcp->bkvhead[j];
@@ -3380,7 +3394,8 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
                                }
                        }
 
-                       // Channel 3 corresponds to emergency path.
+                       // Channel 3 corresponds to both SLAB and vmalloc
+                       // objects queued on the linked list.
                        if (!krwp->head_free) {
                                krwp->head_free = krcp->head;
                                krcp->head = NULL;
@@ -3388,65 +3403,35 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
 
                        WRITE_ONCE(krcp->count, 0);
 
-                       /*
-                        * One work is per one batch, so there are three
-                        * "free channels", the batch can handle. It can
-                        * be that the work is in the pending state when
-                        * channels have been detached following by each
-                        * other.
-                        */
+                       // One work is per one batch, so there are three
+                       // "free channels", the batch can handle. It can
+                       // be that the work is in the pending state when
+                       // channels have been detached following by each
+                       // other.
                        queue_rcu_work(system_wq, &krwp->rcu_work);
                }
-
-               // Repeat if any "free" corresponding channel is still busy.
-               if (krcp->bkvhead[0] || krcp->bkvhead[1] || krcp->head)
-                       repeat = true;
        }
 
-       return !repeat;
-}
-
-static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
-                                         unsigned long flags)
-{
-       // Attempt to start a new batch.
-       krcp->monitor_todo = false;
-       if (queue_kfree_rcu_work(krcp)) {
-               // Success! Our job is done here.
-               raw_spin_unlock_irqrestore(&krcp->lock, flags);
-               return;
-       }
+       // If there is nothing to detach, it means that our job is
+       // successfully done here. In case of having at least one
+       // of the channels that is still busy we should rearm the
+       // work to repeat an attempt. Because previous batches are
+       // still in progress.
+       if (!krcp->bkvhead[0] && !krcp->bkvhead[1] && !krcp->head)
+               krcp->monitor_todo = false;
+       else
+               schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
 
-       // Previous RCU batch still in progress, try again later.
-       krcp->monitor_todo = true;
-       schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
        raw_spin_unlock_irqrestore(&krcp->lock, flags);
 }
 
-/*
- * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
- * It invokes kfree_rcu_drain_unlock() to attempt to start another batch.
- */
-static void kfree_rcu_monitor(struct work_struct *work)
-{
-       unsigned long flags;
-       struct kfree_rcu_cpu *krcp = container_of(work, struct kfree_rcu_cpu,
-                                                monitor_work.work);
-
-       raw_spin_lock_irqsave(&krcp->lock, flags);
-       if (krcp->monitor_todo)
-               kfree_rcu_drain_unlock(krcp, flags);
-       else
-               raw_spin_unlock_irqrestore(&krcp->lock, flags);
-}
-
 static enum hrtimer_restart
 schedule_page_work_fn(struct hrtimer *t)
 {
        struct kfree_rcu_cpu *krcp =
                container_of(t, struct kfree_rcu_cpu, hrtimer);
 
-       queue_work(system_highpri_wq, &krcp->page_cache_work);
+       queue_delayed_work(system_highpri_wq, &krcp->page_cache_work, 0);
        return HRTIMER_NORESTART;
 }
 
@@ -3455,12 +3440,16 @@ static void fill_page_cache_func(struct work_struct *work)
        struct kvfree_rcu_bulk_data *bnode;
        struct kfree_rcu_cpu *krcp =
                container_of(work, struct kfree_rcu_cpu,
-                       page_cache_work);
+                       page_cache_work.work);
        unsigned long flags;
+       int nr_pages;
        bool pushed;
        int i;
 
-       for (i = 0; i < rcu_min_cached_objs; i++) {
+       nr_pages = atomic_read(&krcp->backoff_page_cache_fill) ?
+               1 : rcu_min_cached_objs;
+
+       for (i = 0; i < nr_pages; i++) {
                bnode = (struct kvfree_rcu_bulk_data *)
                        __get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN);
 
@@ -3477,6 +3466,7 @@ static void fill_page_cache_func(struct work_struct *work)
        }
 
        atomic_set(&krcp->work_in_progress, 0);
+       atomic_set(&krcp->backoff_page_cache_fill, 0);
 }
 
 static void
@@ -3484,10 +3474,15 @@ run_page_cache_worker(struct kfree_rcu_cpu *krcp)
 {
        if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
                        !atomic_xchg(&krcp->work_in_progress, 1)) {
-               hrtimer_init(&krcp->hrtimer, CLOCK_MONOTONIC,
-                       HRTIMER_MODE_REL);
-               krcp->hrtimer.function = schedule_page_work_fn;
-               hrtimer_start(&krcp->hrtimer, 0, HRTIMER_MODE_REL);
+               if (atomic_read(&krcp->backoff_page_cache_fill)) {
+                       queue_delayed_work(system_wq,
+                               &krcp->page_cache_work,
+                                       msecs_to_jiffies(rcu_delay_page_cache_fill_msec));
+               } else {
+                       hrtimer_init(&krcp->hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+                       krcp->hrtimer.function = schedule_page_work_fn;
+                       hrtimer_start(&krcp->hrtimer, 0, HRTIMER_MODE_REL);
+               }
        }
 }
 
@@ -3552,11 +3547,11 @@ add_ptr_to_bulk_krc_lock(struct kfree_rcu_cpu **krcp,
 }
 
 /*
- * Queue a request for lazy invocation of appropriate free routine after a
- * grace period. Please note there are three paths are maintained, two are the
- * main ones that use array of pointers interface and third one is emergency
- * one, that is used only when the main path can not be maintained temporary,
- * due to memory pressure.
+ * Queue a request for lazy invocation of the appropriate free routine
+ * after a grace period.  Please note that three paths are maintained,
+ * two for the common case using arrays of pointers and a third one that
+ * is used only when the main paths cannot be used, for example, due to
+ * memory pressure.
  *
  * Each kvfree_call_rcu() request is added to a batch. The batch will be drained
  * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch will
@@ -3645,6 +3640,8 @@ kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc)
                struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
                count += READ_ONCE(krcp->count);
+               count += READ_ONCE(krcp->nr_bkv_objs);
+               atomic_set(&krcp->backoff_page_cache_fill, 1);
        }
 
        return count;
@@ -3654,18 +3651,14 @@ static unsigned long
 kfree_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
 {
        int cpu, freed = 0;
-       unsigned long flags;
 
        for_each_possible_cpu(cpu) {
                int count;
                struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
                count = krcp->count;
-               raw_spin_lock_irqsave(&krcp->lock, flags);
-               if (krcp->monitor_todo)
-                       kfree_rcu_drain_unlock(krcp, flags);
-               else
-                       raw_spin_unlock_irqrestore(&krcp->lock, flags);
+               count += drain_page_cache(krcp);
+               kfree_rcu_monitor(&krcp->monitor_work.work);
 
                sc->nr_to_scan -= count;
                freed += count;
@@ -3693,7 +3686,8 @@ void __init kfree_rcu_scheduler_running(void)
                struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
                raw_spin_lock_irqsave(&krcp->lock, flags);
-               if (!krcp->head || krcp->monitor_todo) {
+               if ((!krcp->bkvhead[0] && !krcp->bkvhead[1] && !krcp->head) ||
+                               krcp->monitor_todo) {
                        raw_spin_unlock_irqrestore(&krcp->lock, flags);
                        continue;
                }
@@ -3750,10 +3744,12 @@ static int rcu_blocking_is_gp(void)
  * read-side critical sections have completed.  Note, however, that
  * upon return from synchronize_rcu(), the caller might well be executing
  * concurrently with new RCU read-side critical sections that began while
- * synchronize_rcu() was waiting.  RCU read-side critical sections are
- * delimited by rcu_read_lock() and rcu_read_unlock(), and may be nested.
- * In addition, regions of code across which interrupts, preemption, or
- * softirqs have been disabled also serve as RCU read-side critical
+ * synchronize_rcu() was waiting.
+ *
+ * RCU read-side critical sections are delimited by rcu_read_lock()
+ * and rcu_read_unlock(), and may be nested.  In addition, but only in
+ * v5.0 and later, regions of code across which interrupts, preemption,
+ * or softirqs have been disabled also serve as RCU read-side critical
  * sections.  This includes hardware interrupt handlers, softirq handlers,
  * and NMI handlers.
  *
@@ -3774,6 +3770,9 @@ static int rcu_blocking_is_gp(void)
  * to have executed a full memory barrier during the execution of
  * synchronize_rcu() -- even if CPU A and CPU B are the same CPU (but
  * again only if the system has more than one CPU).
+ *
+ * Implementation of these memory-ordering guarantees is described here:
+ * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
  */
 void synchronize_rcu(void)
 {
@@ -3844,11 +3843,11 @@ EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu);
 /**
  * poll_state_synchronize_rcu - Conditionally wait for an RCU grace period
  *
- * @oldstate: return from call to get_state_synchronize_rcu() or start_poll_synchronize_rcu()
+ * @oldstate: value from get_state_synchronize_rcu() or start_poll_synchronize_rcu()
  *
  * If a full RCU grace period has elapsed since the earlier call from
  * which oldstate was obtained, return @true, otherwise return @false.
- * If @false is returned, it is the caller's responsibilty to invoke this
+ * If @false is returned, it is the caller's responsibility to invoke this
  * function later on until it does return @true.  Alternatively, the caller
  * can explicitly wait for a grace period, for example, by passing @oldstate
  * to cond_synchronize_rcu() or by directly invoking synchronize_rcu().
@@ -3860,6 +3859,11 @@ EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu);
  * (many hours even on 32-bit systems) should check them occasionally
  * and either refresh them or set a flag indicating that the grace period
  * has completed.
+ *
+ * This function provides the same memory-ordering guarantees that
+ * would be provided by a synchronize_rcu() that was invoked at the call
+ * to the function that provided @oldstate, and that returned at the end
+ * of this function.
  */
 bool poll_state_synchronize_rcu(unsigned long oldstate)
 {
@@ -3874,7 +3878,7 @@ EXPORT_SYMBOL_GPL(poll_state_synchronize_rcu);
 /**
  * cond_synchronize_rcu - Conditionally wait for an RCU grace period
  *
- * @oldstate: return value from earlier call to get_state_synchronize_rcu()
+ * @oldstate: value from get_state_synchronize_rcu() or start_poll_synchronize_rcu()
  *
  * If a full RCU grace period has elapsed since the earlier call to
  * get_state_synchronize_rcu() or start_poll_synchronize_rcu(), just return.
@@ -3884,6 +3888,11 @@ EXPORT_SYMBOL_GPL(poll_state_synchronize_rcu);
  * counter wrap is harmless.  If the counter wraps, we have waited for
  * more than 2 billion grace periods (and way more on a 64-bit system!),
  * so waiting for one additional grace period should be just fine.
+ *
+ * This function provides the same memory-ordering guarantees that
+ * would be provided by a synchronize_rcu() that was invoked at the call
+ * to the function that provided @oldstate, and that returned at the end
+ * of this function.
  */
 void cond_synchronize_rcu(unsigned long oldstate)
 {
@@ -3911,7 +3920,7 @@ static int rcu_pending(int user)
        check_cpu_stall(rdp);
 
        /* Does this CPU need a deferred NOCB wakeup? */
-       if (rcu_nocb_need_deferred_wakeup(rdp))
+       if (rcu_nocb_need_deferred_wakeup(rdp, RCU_NOCB_WAKE))
                return 1;
 
        /* Is this a nohz_full CPU in userspace or idle?  (Ignore RCU if so.) */
@@ -4094,7 +4103,7 @@ EXPORT_SYMBOL_GPL(rcu_barrier);
 /*
  * Propagate ->qsinitmask bits up the rcu_node tree to account for the
  * first CPU in a given leaf rcu_node structure coming online.  The caller
- * must hold the corresponding leaf rcu_node ->lock with interrrupts
+ * must hold the corresponding leaf rcu_node ->lock with interrupts
  * disabled.
  */
 static void rcu_init_new_rnp(struct rcu_node *rnp_leaf)
@@ -4189,7 +4198,7 @@ int rcutree_prepare_cpu(unsigned int cpu)
        rdp->rcu_iw_gp_seq = rdp->gp_seq - 1;
        trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuonl"));
        raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
-       rcu_prepare_kthreads(cpu);
+       rcu_spawn_one_boost_kthread(rnp);
        rcu_spawn_cpu_nocb_kthread(cpu);
        WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus + 1);
 
@@ -4472,6 +4481,7 @@ static int __init rcu_spawn_gp_kthread(void)
        wake_up_process(t);
        rcu_spawn_nocb_kthreads();
        rcu_spawn_boost_kthreads();
+       rcu_spawn_core_kthreads();
        return 0;
 }
 early_initcall(rcu_spawn_gp_kthread);
@@ -4582,11 +4592,25 @@ static void __init rcu_init_one(void)
  * replace the definitions in tree.h because those are needed to size
  * the ->node array in the rcu_state structure.
  */
-static void __init rcu_init_geometry(void)
+void rcu_init_geometry(void)
 {
        ulong d;
        int i;
+       static unsigned long old_nr_cpu_ids;
        int rcu_capacity[RCU_NUM_LVLS];
+       static bool initialized;
+
+       if (initialized) {
+               /*
+                * Warn if setup_nr_cpu_ids() had not yet been invoked,
+                * unless nr_cpus_ids == NR_CPUS, in which case who cares?
+                */
+               WARN_ON_ONCE(old_nr_cpu_ids != nr_cpu_ids);
+               return;
+       }
+
+       old_nr_cpu_ids = nr_cpu_ids;
+       initialized = true;
 
        /*
         * Initialize any unspecified boot parameters.
@@ -4687,6 +4711,18 @@ static void __init kfree_rcu_batch_init(void)
        int cpu;
        int i;
 
+       /* Clamp it to [0:100] seconds interval. */
+       if (rcu_delay_page_cache_fill_msec < 0 ||
+               rcu_delay_page_cache_fill_msec > 100 * MSEC_PER_SEC) {
+
+               rcu_delay_page_cache_fill_msec =
+                       clamp(rcu_delay_page_cache_fill_msec, 0,
+                               (int) (100 * MSEC_PER_SEC));
+
+               pr_info("Adjusting rcutree.rcu_delay_page_cache_fill_msec to %d ms.\n",
+                       rcu_delay_page_cache_fill_msec);
+       }
+
        for_each_possible_cpu(cpu) {
                struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
@@ -4696,7 +4732,7 @@ static void __init kfree_rcu_batch_init(void)
                }
 
                INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
-               INIT_WORK(&krcp->page_cache_work, fill_page_cache_func);
+               INIT_DELAYED_WORK(&krcp->page_cache_work, fill_page_cache_func);
                krcp->initialized = true;
        }
        if (register_shrinker(&kfree_rcu_shrinker))
@@ -4730,12 +4766,11 @@ void __init rcu_init(void)
                rcutree_online_cpu(cpu);
        }
 
-       /* Create workqueue for expedited GPs and for Tree SRCU. */
+       /* Create workqueue for Tree SRCU and for expedited GPs. */
        rcu_gp_wq = alloc_workqueue("rcu_gp", WQ_MEM_RECLAIM, 0);
        WARN_ON(!rcu_gp_wq);
        rcu_par_gp_wq = alloc_workqueue("rcu_par_gp", WQ_MEM_RECLAIM, 0);
        WARN_ON(!rcu_par_gp_wq);
-       srcu_init();
 
        /* Fill in default value for rcutree.qovld boot parameter. */
        /* -After- the rcu_node ->lock fields are initialized! */