srcu: Make Tree SRCU able to operate without snp_node array
authorPaul E. McKenney <paulmck@kernel.org>
Mon, 24 Jan 2022 17:46:57 +0000 (09:46 -0800)
committerPaul E. McKenney <paulmck@kernel.org>
Mon, 11 Apr 2022 22:31:02 +0000 (15:31 -0700)
This commit makes Tree SRCU able to operate without an snp_node
array, that is, when the srcu_data structures' ->mynode pointers
are NULL.  This can result in high contention on the srcu_struct
structure's ->lock, but only when there are lots of call_srcu(),
synchronize_srcu(), and synchronize_srcu_expedited() calls.

Note that when there is no snp_node array, all SRCU callbacks use
CPU 0's callback queue.  This is optimal in the common case of low
update-side load because it removes the need to search each CPU
for the single callback that made the grace period happen.

Co-developed-by: Neeraj Upadhyay <quic_neeraju@quicinc.com>
Signed-off-by: Neeraj Upadhyay <quic_neeraju@quicinc.com>
Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
include/linux/srcutree.h
kernel/rcu/srcutree.c

index 4025840..8d1da13 100644 (file)
@@ -63,8 +63,9 @@ struct srcu_struct {
        struct srcu_node node[NUM_RCU_NODES];   /* Combining tree. */
        struct srcu_node *level[RCU_NUM_LVLS + 1];
                                                /* First node at each level. */
+       int srcu_size_state;                    /* Small-to-big transition state. */
        struct mutex srcu_cb_mutex;             /* Serialize CB preparation. */
-       spinlock_t __private lock;              /* Protect counters */
+       spinlock_t __private lock;              /* Protect counters and size state. */
        struct mutex srcu_gp_mutex;             /* Serialize GP work. */
        unsigned int srcu_idx;                  /* Current rdr array element. */
        unsigned long srcu_gp_seq;              /* Grace-period seq #. */
@@ -83,6 +84,17 @@ struct srcu_struct {
        struct lockdep_map dep_map;
 };
 
+/* Values for size state variable (->srcu_size_state). */
+#define SRCU_SIZE_SMALL                0
+#define SRCU_SIZE_ALLOC                1
+#define SRCU_SIZE_WAIT_BARRIER 2
+#define SRCU_SIZE_WAIT_CALL    3
+#define SRCU_SIZE_WAIT_CBS1    4
+#define SRCU_SIZE_WAIT_CBS2    5
+#define SRCU_SIZE_WAIT_CBS3    6
+#define SRCU_SIZE_WAIT_CBS4    7
+#define SRCU_SIZE_BIG          8
+
 /* Values for state variable (bottom bits of ->srcu_gp_seq). */
 #define SRCU_STATE_IDLE                0
 #define SRCU_STATE_SCAN1       1
index 7d13e35..e23696e 100644 (file)
@@ -152,16 +152,17 @@ static void init_srcu_struct_nodes(struct srcu_struct *ssp)
                sdp->ssp = ssp;
                sdp->grpmask = 1 << (cpu - sdp->mynode->grplo);
        }
+       smp_store_release(&ssp->srcu_size_state, SRCU_SIZE_WAIT_BARRIER);
 }
 
 /*
  * Initialize non-compile-time initialized fields, including the
- * associated srcu_node and srcu_data structures.  The is_static
- * parameter is passed through to init_srcu_struct_nodes(), and
- * also tells us that ->sda has already been wired up to srcu_data.
+ * associated srcu_node and srcu_data structures.  The is_static parameter
+ * tells us that ->sda has already been wired up to srcu_data.
  */
 static int init_srcu_struct_fields(struct srcu_struct *ssp, bool is_static)
 {
+       ssp->srcu_size_state = SRCU_SIZE_SMALL;
        mutex_init(&ssp->srcu_cb_mutex);
        mutex_init(&ssp->srcu_gp_mutex);
        ssp->srcu_idx = 0;
@@ -175,6 +176,7 @@ static int init_srcu_struct_fields(struct srcu_struct *ssp, bool is_static)
        if (!ssp->sda)
                return -ENOMEM;
        init_srcu_struct_nodes(ssp);
+       ssp->srcu_size_state = SRCU_SIZE_BIG;
        ssp->srcu_gp_seq_needed_exp = 0;
        ssp->srcu_last_gp_end = ktime_get_mono_fast_ns();
        smp_store_release(&ssp->srcu_gp_seq_needed, 0); /* Init done. */
@@ -391,6 +393,7 @@ void cleanup_srcu_struct(struct srcu_struct *ssp)
        }
        free_percpu(ssp->sda);
        ssp->sda = NULL;
+       ssp->srcu_size_state = SRCU_SIZE_SMALL;
 }
 EXPORT_SYMBOL_GPL(cleanup_srcu_struct);
 
@@ -439,6 +442,10 @@ static void srcu_gp_start(struct srcu_struct *ssp)
        struct srcu_data *sdp = this_cpu_ptr(ssp->sda);
        int state;
 
+       if (smp_load_acquire(&ssp->srcu_size_state) < SRCU_SIZE_WAIT_BARRIER)
+               sdp = per_cpu_ptr(ssp->sda, 0);
+       else
+               sdp = this_cpu_ptr(ssp->sda);
        lockdep_assert_held(&ACCESS_PRIVATE(ssp, lock));
        WARN_ON_ONCE(ULONG_CMP_GE(ssp->srcu_gp_seq, ssp->srcu_gp_seq_needed));
        spin_lock_rcu_node(sdp);  /* Interrupts already disabled. */
@@ -539,38 +546,40 @@ static void srcu_gp_end(struct srcu_struct *ssp)
        /* A new grace period can start at this point.  But only one. */
 
        /* Initiate callback invocation as needed. */
-       idx = rcu_seq_ctr(gpseq) % ARRAY_SIZE(snp->srcu_have_cbs);
-       srcu_for_each_node_breadth_first(ssp, snp) {
-               spin_lock_irq_rcu_node(snp);
-               cbs = false;
-               last_lvl = snp >= ssp->level[rcu_num_lvls - 1];
-               if (last_lvl)
-                       cbs = snp->srcu_have_cbs[idx] == gpseq;
-               snp->srcu_have_cbs[idx] = gpseq;
-               rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1);
-               if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq))
-                       WRITE_ONCE(snp->srcu_gp_seq_needed_exp, gpseq);
-               mask = snp->srcu_data_have_cbs[idx];
-               snp->srcu_data_have_cbs[idx] = 0;
-               spin_unlock_irq_rcu_node(snp);
-               if (cbs)
-                       srcu_schedule_cbs_snp(ssp, snp, mask, cbdelay);
-
-               /* Occasionally prevent srcu_data counter wrap. */
-               if (!(gpseq & counter_wrap_check) && last_lvl)
-                       for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) {
-                               sdp = per_cpu_ptr(ssp->sda, cpu);
-                               spin_lock_irqsave_rcu_node(sdp, flags);
-                               if (ULONG_CMP_GE(gpseq,
-                                                sdp->srcu_gp_seq_needed + 100))
-                                       sdp->srcu_gp_seq_needed = gpseq;
-                               if (ULONG_CMP_GE(gpseq,
-                                                sdp->srcu_gp_seq_needed_exp + 100))
-                                       sdp->srcu_gp_seq_needed_exp = gpseq;
-                               spin_unlock_irqrestore_rcu_node(sdp, flags);
-                       }
+       if (smp_load_acquire(&ssp->srcu_size_state) < SRCU_SIZE_WAIT_BARRIER) {
+               srcu_schedule_cbs_sdp(per_cpu_ptr(ssp->sda, 0), cbdelay);
+       } else {
+               idx = rcu_seq_ctr(gpseq) % ARRAY_SIZE(snp->srcu_have_cbs);
+               srcu_for_each_node_breadth_first(ssp, snp) {
+                       spin_lock_irq_rcu_node(snp);
+                       cbs = false;
+                       last_lvl = snp >= ssp->level[rcu_num_lvls - 1];
+                       if (last_lvl)
+                               cbs = snp->srcu_have_cbs[idx] == gpseq;
+                       snp->srcu_have_cbs[idx] = gpseq;
+                       rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1);
+                       if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq))
+                               WRITE_ONCE(snp->srcu_gp_seq_needed_exp, gpseq);
+                       mask = snp->srcu_data_have_cbs[idx];
+                       snp->srcu_data_have_cbs[idx] = 0;
+                       spin_unlock_irq_rcu_node(snp);
+                       if (cbs)
+                               srcu_schedule_cbs_snp(ssp, snp, mask, cbdelay);
+               }
        }
 
+       /* Occasionally prevent srcu_data counter wrap. */
+       if (!(gpseq & counter_wrap_check))
+               for_each_possible_cpu(cpu) {
+                       sdp = per_cpu_ptr(ssp->sda, cpu);
+                       spin_lock_irqsave_rcu_node(sdp, flags);
+                       if (ULONG_CMP_GE(gpseq, sdp->srcu_gp_seq_needed + 100))
+                               sdp->srcu_gp_seq_needed = gpseq;
+                       if (ULONG_CMP_GE(gpseq, sdp->srcu_gp_seq_needed_exp + 100))
+                               sdp->srcu_gp_seq_needed_exp = gpseq;
+                       spin_unlock_irqrestore_rcu_node(sdp, flags);
+               }
+
        /* Callback initiation done, allow grace periods after next. */
        mutex_unlock(&ssp->srcu_cb_mutex);
 
@@ -599,18 +608,19 @@ static void srcu_funnel_exp_start(struct srcu_struct *ssp, struct srcu_node *snp
 {
        unsigned long flags;
 
-       for (; snp != NULL; snp = snp->srcu_parent) {
-               if (rcu_seq_done(&ssp->srcu_gp_seq, s) ||
-                   ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s))
-                       return;
-               spin_lock_irqsave_rcu_node(snp, flags);
-               if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) {
+       if (snp)
+               for (; snp != NULL; snp = snp->srcu_parent) {
+                       if (rcu_seq_done(&ssp->srcu_gp_seq, s) ||
+                           ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s))
+                               return;
+                       spin_lock_irqsave_rcu_node(snp, flags);
+                       if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) {
+                               spin_unlock_irqrestore_rcu_node(snp, flags);
+                               return;
+                       }
+                       WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
                        spin_unlock_irqrestore_rcu_node(snp, flags);
-                       return;
                }
-               WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
-               spin_unlock_irqrestore_rcu_node(snp, flags);
-       }
        spin_lock_irqsave_rcu_node(ssp, flags);
        if (ULONG_CMP_LT(ssp->srcu_gp_seq_needed_exp, s))
                WRITE_ONCE(ssp->srcu_gp_seq_needed_exp, s);
@@ -633,36 +643,37 @@ static void srcu_funnel_gp_start(struct srcu_struct *ssp, struct srcu_data *sdp,
        unsigned long flags;
        int idx = rcu_seq_ctr(s) % ARRAY_SIZE(sdp->mynode->srcu_have_cbs);
        struct srcu_node *snp;
-       struct srcu_node *snp_leaf = sdp->mynode;
+       struct srcu_node *snp_leaf = smp_load_acquire(&sdp->mynode);
        unsigned long snp_seq;
 
-       /* Each pass through the loop does one level of the srcu_node tree. */
-       for (snp = snp_leaf; snp != NULL; snp = snp->srcu_parent) {
-               if (rcu_seq_done(&ssp->srcu_gp_seq, s) && snp != snp_leaf)
-                       return; /* GP already done and CBs recorded. */
-               spin_lock_irqsave_rcu_node(snp, flags);
-               if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) {
-                       snp_seq = snp->srcu_have_cbs[idx];
-                       if (snp == snp_leaf && snp_seq == s)
-                               snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
-                       spin_unlock_irqrestore_rcu_node(snp, flags);
-                       if (snp == snp_leaf && snp_seq != s) {
-                               srcu_schedule_cbs_sdp(sdp, do_norm
-                                                          ? SRCU_INTERVAL
-                                                          : 0);
+       if (snp_leaf)
+               /* Each pass through the loop does one level of the srcu_node tree. */
+               for (snp = snp_leaf; snp != NULL; snp = snp->srcu_parent) {
+                       if (rcu_seq_done(&ssp->srcu_gp_seq, s) && snp != snp_leaf)
+                               return; /* GP already done and CBs recorded. */
+                       spin_lock_irqsave_rcu_node(snp, flags);
+                       if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) {
+                               snp_seq = snp->srcu_have_cbs[idx];
+                               if (snp == snp_leaf && snp_seq == s)
+                                       snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
+                               spin_unlock_irqrestore_rcu_node(snp, flags);
+                               if (snp == snp_leaf && snp_seq != s) {
+                                       srcu_schedule_cbs_sdp(sdp, do_norm
+                                                                  ? SRCU_INTERVAL
+                                                                  : 0);
+                                       return;
+                               }
+                               if (!do_norm)
+                                       srcu_funnel_exp_start(ssp, snp, s);
                                return;
                        }
-                       if (!do_norm)
-                               srcu_funnel_exp_start(ssp, snp, s);
-                       return;
+                       snp->srcu_have_cbs[idx] = s;
+                       if (snp == snp_leaf)
+                               snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
+                       if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s))
+                               WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
+                       spin_unlock_irqrestore_rcu_node(snp, flags);
                }
-               snp->srcu_have_cbs[idx] = s;
-               if (snp == snp_leaf)
-                       snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
-               if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s))
-                       WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
-               spin_unlock_irqrestore_rcu_node(snp, flags);
-       }
 
        /* Top of tree, must ensure the grace period will be started. */
        spin_lock_irqsave_rcu_node(ssp, flags);
@@ -820,7 +831,10 @@ static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
 
        check_init_srcu_struct(ssp);
        idx = srcu_read_lock(ssp);
-       sdp = raw_cpu_ptr(ssp->sda);
+       if (smp_load_acquire(&ssp->srcu_size_state) < SRCU_SIZE_WAIT_CALL)
+               sdp = per_cpu_ptr(ssp->sda, 0);
+       else
+               sdp = raw_cpu_ptr(ssp->sda);
        spin_lock_irqsave_rcu_node(sdp, flags);
        if (rhp)
                rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp);
@@ -840,7 +854,7 @@ static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
        if (needgp)
                srcu_funnel_gp_start(ssp, sdp, s, do_norm);
        else if (needexp)
-               srcu_funnel_exp_start(ssp, sdp->mynode, s);
+               srcu_funnel_exp_start(ssp, smp_load_acquire(&sdp->mynode), s);
        srcu_read_unlock(ssp, idx);
        return s;
 }
@@ -1100,6 +1114,28 @@ static void srcu_barrier_cb(struct rcu_head *rhp)
                complete(&ssp->srcu_barrier_completion);
 }
 
+/*
+ * Enqueue an srcu_barrier() callback on the specified srcu_data
+ * structure's ->cblist.  but only if that ->cblist already has at least one
+ * callback enqueued.  Note that if a CPU already has callbacks enqueue,
+ * it must have already registered the need for a future grace period,
+ * so all we need do is enqueue a callback that will use the same grace
+ * period as the last callback already in the queue.
+ */
+static void srcu_barrier_one_cpu(struct srcu_struct *ssp, struct srcu_data *sdp)
+{
+       spin_lock_irq_rcu_node(sdp);
+       atomic_inc(&ssp->srcu_barrier_cpu_cnt);
+       sdp->srcu_barrier_head.func = srcu_barrier_cb;
+       debug_rcu_head_queue(&sdp->srcu_barrier_head);
+       if (!rcu_segcblist_entrain(&sdp->srcu_cblist,
+                                  &sdp->srcu_barrier_head)) {
+               debug_rcu_head_unqueue(&sdp->srcu_barrier_head);
+               atomic_dec(&ssp->srcu_barrier_cpu_cnt);
+       }
+       spin_unlock_irq_rcu_node(sdp);
+}
+
 /**
  * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
  * @ssp: srcu_struct on which to wait for in-flight callbacks.
@@ -1107,7 +1143,6 @@ static void srcu_barrier_cb(struct rcu_head *rhp)
 void srcu_barrier(struct srcu_struct *ssp)
 {
        int cpu;
-       struct srcu_data *sdp;
        unsigned long s = rcu_seq_snap(&ssp->srcu_barrier_seq);
 
        check_init_srcu_struct(ssp);
@@ -1123,27 +1158,11 @@ void srcu_barrier(struct srcu_struct *ssp)
        /* Initial count prevents reaching zero until all CBs are posted. */
        atomic_set(&ssp->srcu_barrier_cpu_cnt, 1);
 
-       /*
-        * Each pass through this loop enqueues a callback, but only
-        * on CPUs already having callbacks enqueued.  Note that if
-        * a CPU already has callbacks enqueue, it must have already
-        * registered the need for a future grace period, so all we
-        * need do is enqueue a callback that will use the same
-        * grace period as the last callback already in the queue.
-        */
-       for_each_possible_cpu(cpu) {
-               sdp = per_cpu_ptr(ssp->sda, cpu);
-               spin_lock_irq_rcu_node(sdp);
-               atomic_inc(&ssp->srcu_barrier_cpu_cnt);
-               sdp->srcu_barrier_head.func = srcu_barrier_cb;
-               debug_rcu_head_queue(&sdp->srcu_barrier_head);
-               if (!rcu_segcblist_entrain(&sdp->srcu_cblist,
-                                          &sdp->srcu_barrier_head)) {
-                       debug_rcu_head_unqueue(&sdp->srcu_barrier_head);
-                       atomic_dec(&ssp->srcu_barrier_cpu_cnt);
-               }
-               spin_unlock_irq_rcu_node(sdp);
-       }
+       if (smp_load_acquire(&ssp->srcu_size_state) < SRCU_SIZE_WAIT_BARRIER)
+               srcu_barrier_one_cpu(ssp, per_cpu_ptr(ssp->sda, 0));
+       else
+               for_each_possible_cpu(cpu)
+                       srcu_barrier_one_cpu(ssp, per_cpu_ptr(ssp->sda, cpu));
 
        /* Remove the initial count, at which point reaching zero can happen. */
        if (atomic_dec_and_test(&ssp->srcu_barrier_cpu_cnt))