ring-buffer: speed up buffer resets by avoiding synchronize_rcu for each CPU
[linux-2.6-microblaze.git] / kernel / trace / ring_buffer.c
index 00867ff..ed19413 100644 (file)
@@ -270,6 +270,9 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data);
 #define for_each_buffer_cpu(buffer, cpu)               \
        for_each_cpu(cpu, buffer->cpumask)
 
+#define for_each_online_buffer_cpu(buffer, cpu)                \
+       for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)
+
 #define TS_SHIFT       27
 #define TS_MASK                ((1ULL << TS_SHIFT) - 1)
 #define TS_DELTA_TEST  (~TS_MASK)
@@ -418,6 +421,19 @@ struct rb_event_info {
        int                     add_timestamp;
 };
 
+/*
+ * Used for the add_timestamp
+ *  NONE
+ *  EXTEND - wants a time extend
+ *  ABSOLUTE - the buffer requests all events to have absolute time stamps
+ *  FORCE - force a full time stamp.
+ */
+enum {
+       RB_ADD_STAMP_NONE               = 0,
+       RB_ADD_STAMP_EXTEND             = BIT(1),
+       RB_ADD_STAMP_ABSOLUTE           = BIT(2),
+       RB_ADD_STAMP_FORCE              = BIT(3)
+};
 /*
  * Used for which event context the event is in.
  *  NMI     = 0
@@ -435,6 +451,28 @@ enum {
        RB_CTX_MAX
 };
 
+#if BITS_PER_LONG == 32
+#define RB_TIME_32
+#endif
+
+/* To test on 64 bit machines */
+//#define RB_TIME_32
+
+#ifdef RB_TIME_32
+
+struct rb_time_struct {
+       local_t         cnt;
+       local_t         top;
+       local_t         bottom;
+};
+#else
+#include <asm/local64.h>
+struct rb_time_struct {
+       local64_t       time;
+};
+#endif
+typedef struct rb_time_struct rb_time_t;
+
 /*
  * head_page == tail_page && head == tail then buffer is empty.
  */
@@ -470,7 +508,8 @@ struct ring_buffer_per_cpu {
        size_t                          shortest_full;
        unsigned long                   read;
        unsigned long                   read_bytes;
-       u64                             write_stamp;
+       rb_time_t                       write_stamp;
+       rb_time_t                       before_stamp;
        u64                             read_stamp;
        /* ring buffer pages to update, > 0 to add, < 0 to remove */
        long                            nr_pages_to_update;
@@ -513,6 +552,189 @@ struct ring_buffer_iter {
        int                             missed_events;
 };
 
+#ifdef RB_TIME_32
+
+/*
+ * On 32 bit machines, local64_t is very expensive. As the ring
+ * buffer doesn't need all the features of a true 64 bit atomic,
+ * on 32 bit, it uses these functions (64 still uses local64_t).
+ *
+ * For the ring buffer, 64 bit required operations for the time is
+ * the following:
+ *
+ *  - Only need 59 bits (uses 60 to make it even).
+ *  - Reads may fail if it interrupted a modification of the time stamp.
+ *      It will succeed if it did not interrupt another write even if
+ *      the read itself is interrupted by a write.
+ *      It returns whether it was successful or not.
+ *
+ *  - Writes always succeed and will overwrite other writes and writes
+ *      that were done by events interrupting the current write.
+ *
+ *  - A write followed by a read of the same time stamp will always succeed,
+ *      but may not contain the same value.
+ *
+ *  - A cmpxchg will fail if it interrupted another write or cmpxchg.
+ *      Other than that, it acts like a normal cmpxchg.
+ *
+ * The 60 bit time stamp is broken up by 30 bits in a top and bottom half
+ *  (bottom being the least significant 30 bits of the 60 bit time stamp).
+ *
+ * The two most significant bits of each half holds a 2 bit counter (0-3).
+ * Each update will increment this counter by one.
+ * When reading the top and bottom, if the two counter bits match then the
+ *  top and bottom together make a valid 60 bit number.
+ */
+#define RB_TIME_SHIFT  30
+#define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1)
+
+static inline int rb_time_cnt(unsigned long val)
+{
+       return (val >> RB_TIME_SHIFT) & 3;
+}
+
+static inline u64 rb_time_val(unsigned long top, unsigned long bottom)
+{
+       u64 val;
+
+       val = top & RB_TIME_VAL_MASK;
+       val <<= RB_TIME_SHIFT;
+       val |= bottom & RB_TIME_VAL_MASK;
+
+       return val;
+}
+
+static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt)
+{
+       unsigned long top, bottom;
+       unsigned long c;
+
+       /*
+        * If the read is interrupted by a write, then the cnt will
+        * be different. Loop until both top and bottom have been read
+        * without interruption.
+        */
+       do {
+               c = local_read(&t->cnt);
+               top = local_read(&t->top);
+               bottom = local_read(&t->bottom);
+       } while (c != local_read(&t->cnt));
+
+       *cnt = rb_time_cnt(top);
+
+       /* If top and bottom counts don't match, this interrupted a write */
+       if (*cnt != rb_time_cnt(bottom))
+               return false;
+
+       *ret = rb_time_val(top, bottom);
+       return true;
+}
+
+static bool rb_time_read(rb_time_t *t, u64 *ret)
+{
+       unsigned long cnt;
+
+       return __rb_time_read(t, ret, &cnt);
+}
+
+static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt)
+{
+       return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT);
+}
+
+static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom)
+{
+       *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK);
+       *bottom = (unsigned long)(val & RB_TIME_VAL_MASK);
+}
+
+static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt)
+{
+       val = rb_time_val_cnt(val, cnt);
+       local_set(t, val);
+}
+
+static void rb_time_set(rb_time_t *t, u64 val)
+{
+       unsigned long cnt, top, bottom;
+
+       rb_time_split(val, &top, &bottom);
+
+       /* Writes always succeed with a valid number even if it gets interrupted. */
+       do {
+               cnt = local_inc_return(&t->cnt);
+               rb_time_val_set(&t->top, top, cnt);
+               rb_time_val_set(&t->bottom, bottom, cnt);
+       } while (cnt != local_read(&t->cnt));
+}
+
+static inline bool
+rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set)
+{
+       unsigned long ret;
+
+       ret = local_cmpxchg(l, expect, set);
+       return ret == expect;
+}
+
+static int rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
+{
+       unsigned long cnt, top, bottom;
+       unsigned long cnt2, top2, bottom2;
+       u64 val;
+
+       /* The cmpxchg always fails if it interrupted an update */
+        if (!__rb_time_read(t, &val, &cnt2))
+                return false;
+
+        if (val != expect)
+                return false;
+
+        cnt = local_read(&t->cnt);
+        if ((cnt & 3) != cnt2)
+                return false;
+
+        cnt2 = cnt + 1;
+
+        rb_time_split(val, &top, &bottom);
+        top = rb_time_val_cnt(top, cnt);
+        bottom = rb_time_val_cnt(bottom, cnt);
+
+        rb_time_split(set, &top2, &bottom2);
+        top2 = rb_time_val_cnt(top2, cnt2);
+        bottom2 = rb_time_val_cnt(bottom2, cnt2);
+
+       if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2))
+               return false;
+       if (!rb_time_read_cmpxchg(&t->top, top, top2))
+               return false;
+       if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2))
+               return false;
+       return true;
+}
+
+#else /* 64 bits */
+
+/* local64_t always succeeds */
+
+static inline bool rb_time_read(rb_time_t *t, u64 *ret)
+{
+       *ret = local64_read(&t->time);
+       return true;
+}
+static void rb_time_set(rb_time_t *t, u64 val)
+{
+       local64_set(&t->time, val);
+}
+
+static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
+{
+       u64 val;
+       val = local64_cmpxchg(&t->time, expect, set);
+       return val == expect;
+}
+#endif
+
 /**
  * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
  * @buffer: The ring_buffer to get the number of pages from
@@ -2416,16 +2638,13 @@ rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
        unsigned length = info->length;
        u64 delta = info->delta;
 
-       /* Only a commit updates the timestamp */
-       if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
-               delta = 0;
-
        /*
         * If we need to add a timestamp, then we
         * add it to the start of the reserved space.
         */
        if (unlikely(info->add_timestamp)) {
-               bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer);
+               bool abs = info->add_timestamp &
+                       (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
 
                event = rb_add_time_stamp(event, abs ? info->delta : delta, abs);
                length -= RB_LEN_TIME_EXTEND;
@@ -2480,6 +2699,39 @@ static inline bool sched_clock_stable(void)
 }
 #endif
 
+static __always_inline bool
+rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
+                  struct ring_buffer_event *event)
+{
+       unsigned long addr = (unsigned long)event;
+       unsigned long index;
+
+       index = rb_event_index(event);
+       addr &= PAGE_MASK;
+
+       return cpu_buffer->commit_page->page == (void *)addr &&
+               rb_commit_index(cpu_buffer) == index;
+}
+
+static u64 rb_time_delta(struct ring_buffer_event *event)
+{
+       switch (event->type_len) {
+       case RINGBUF_TYPE_PADDING:
+               return 0;
+
+       case RINGBUF_TYPE_TIME_EXTEND:
+               return ring_buffer_event_time_stamp(event);
+
+       case RINGBUF_TYPE_TIME_STAMP:
+               return 0;
+
+       case RINGBUF_TYPE_DATA:
+               return event->time_delta;
+       default:
+               return 0;
+       }
+}
+
 static inline int
 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
                  struct ring_buffer_event *event)
@@ -2488,6 +2740,8 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
        struct buffer_page *bpage;
        unsigned long index;
        unsigned long addr;
+       u64 write_stamp;
+       u64 delta;
 
        new_index = rb_event_index(event);
        old_index = new_index + rb_event_ts_length(event);
@@ -2496,10 +2750,32 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
 
        bpage = READ_ONCE(cpu_buffer->tail_page);
 
+       delta = rb_time_delta(event);
+
+       if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp))
+               return 0;
+
+       /* Make sure the write stamp is read before testing the location */
+       barrier();
+
        if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
                unsigned long write_mask =
                        local_read(&bpage->write) & ~RB_WRITE_MASK;
                unsigned long event_length = rb_event_length(event);
+
+               /* Something came in, can't discard */
+               if (!rb_time_cmpxchg(&cpu_buffer->write_stamp,
+                                      write_stamp, write_stamp - delta))
+                       return 0;
+
+               /*
+                * If an event were to come in now, it would see that the
+                * write_stamp and the before_stamp are different, and assume
+                * that this event just added itself before updating
+                * the write stamp. The interrupting event will fix the
+                * write stamp for us, and use the before stamp as its delta.
+                */
+
                /*
                 * This is on the tail page. It is possible that
                 * a write could come in and move the tail page
@@ -2551,10 +2827,6 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
                local_set(&cpu_buffer->commit_page->page->commit,
                          rb_page_write(cpu_buffer->commit_page));
                rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
-               /* Only update the write stamp if the page has an event */
-               if (rb_page_write(cpu_buffer->commit_page))
-                       cpu_buffer->write_stamp =
-                               cpu_buffer->commit_page->page->time_stamp;
                /* add barrier to keep gcc from optimizing too much */
                barrier();
        }
@@ -2626,54 +2898,10 @@ static inline void rb_event_discard(struct ring_buffer_event *event)
                event->time_delta = 1;
 }
 
-static __always_inline bool
-rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
-                  struct ring_buffer_event *event)
-{
-       unsigned long addr = (unsigned long)event;
-       unsigned long index;
-
-       index = rb_event_index(event);
-       addr &= PAGE_MASK;
-
-       return cpu_buffer->commit_page->page == (void *)addr &&
-               rb_commit_index(cpu_buffer) == index;
-}
-
-static __always_inline void
-rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
-                     struct ring_buffer_event *event)
-{
-       u64 delta;
-
-       /*
-        * The event first in the commit queue updates the
-        * time stamp.
-        */
-       if (rb_event_is_commit(cpu_buffer, event)) {
-               /*
-                * A commit event that is first on a page
-                * updates the write timestamp with the page stamp
-                */
-               if (!rb_event_index(event))
-                       cpu_buffer->write_stamp =
-                               cpu_buffer->commit_page->page->time_stamp;
-               else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
-                       delta = ring_buffer_event_time_stamp(event);
-                       cpu_buffer->write_stamp += delta;
-               } else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) {
-                       delta = ring_buffer_event_time_stamp(event);
-                       cpu_buffer->write_stamp = delta;
-               } else
-                       cpu_buffer->write_stamp += event->time_delta;
-       }
-}
-
 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
                      struct ring_buffer_event *event)
 {
        local_inc(&cpu_buffer->entries);
-       rb_update_write_stamp(cpu_buffer, event);
        rb_end_commit(cpu_buffer);
 }
 
@@ -2865,20 +3093,21 @@ int ring_buffer_unlock_commit(struct trace_buffer *buffer,
 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
 
 static noinline void
-rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
-                   struct rb_event_info *info)
+rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
+                  struct rb_event_info *info)
 {
+       u64 write_stamp;
+
        WARN_ONCE(info->delta > (1ULL << 59),
                  KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
                  (unsigned long long)info->delta,
                  (unsigned long long)info->ts,
-                 (unsigned long long)cpu_buffer->write_stamp,
+                 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0),
                  sched_clock_stable() ? "" :
                  "If you just came from a suspend/resume,\n"
                  "please switch to the trace global clock:\n"
                  "  echo global > /sys/kernel/debug/tracing/trace_clock\n"
                  "or add trace_clock=global to the kernel command line\n");
-       info->add_timestamp = 1;
 }
 
 static struct ring_buffer_event *
@@ -2887,7 +3116,41 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
 {
        struct ring_buffer_event *event;
        struct buffer_page *tail_page;
-       unsigned long tail, write;
+       unsigned long tail, write, w;
+       u64 before, after;
+       bool a_ok;
+       bool b_ok;
+
+       /* Don't let the compiler play games with cpu_buffer->tail_page */
+       tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
+
+ /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK;
+       barrier();
+       b_ok = rb_time_read(&cpu_buffer->before_stamp, &before);
+       a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+       barrier();
+       info->ts = rb_time_stamp(cpu_buffer->buffer);
+
+       if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
+               info->delta = info->ts;
+               info->add_timestamp = RB_ADD_STAMP_ABSOLUTE;
+       } else {
+               info->delta = info->ts - after;
+       }
+
+       if (likely(a_ok && b_ok)) {
+               if (unlikely(test_time_stamp(info->delta))) {
+                       rb_check_timestamp(cpu_buffer, info);
+                       info->add_timestamp |= RB_ADD_STAMP_EXTEND;
+               }
+       }
+
+       /*
+        * If interrupting an event time update, we may need an absolute timestamp.
+        * Don't bother if this is the start of a new page (w == 0).
+        */
+       if (unlikely(!a_ok || !b_ok || (before != after && w)))
+               info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
 
        /*
         * If the time delta since the last event is too big to
@@ -2897,25 +3160,95 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
        if (unlikely(info->add_timestamp))
                info->length += RB_LEN_TIME_EXTEND;
 
-       /* Don't let the compiler play games with cpu_buffer->tail_page */
-       tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
      write = local_add_return(info->length, &tail_page->write);
+ /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts);
+
/*C*/ write = local_add_return(info->length, &tail_page->write);
 
        /* set write to only the index of the write */
        write &= RB_WRITE_MASK;
+
        tail = write - info->length;
 
+       /* See if we shot pass the end of this buffer page */
+       if (unlikely(write > BUF_PAGE_SIZE)) {
+               if (tail != w) {
+                       /* before and after may now different, fix it up*/
+                       b_ok = rb_time_read(&cpu_buffer->before_stamp, &before);
+                       a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+                       if (a_ok && b_ok && before != after)
+                               (void)rb_time_cmpxchg(&cpu_buffer->before_stamp, before, after);
+               }
+               return rb_move_tail(cpu_buffer, tail, info);
+       }
+
+       if (likely(tail == w)) {
+               u64 save_before;
+               bool s_ok;
+
+               /* Nothing interrupted us between A and C */
+ /*D*/         rb_time_set(&cpu_buffer->write_stamp, info->ts);
+               barrier();
+ /*E*/         s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before);
+               RB_WARN_ON(cpu_buffer, !s_ok);
+               if (likely(!(info->add_timestamp &
+                            (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
+                       /* This did not interrupt any time update */
+                       info->delta = info->ts - after;
+               else
+                       /* Just use full timestamp for inerrupting event */
+                       info->delta = info->ts;
+               barrier();
+               if (unlikely(info->ts != save_before)) {
+                       /* SLOW PATH - Interrupted between C and E */
+
+                       a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+                       RB_WARN_ON(cpu_buffer, !a_ok);
+
+                       /* Write stamp must only go forward */
+                       if (save_before > after) {
+                               /*
+                                * We do not care about the result, only that
+                                * it gets updated atomically.
+                                */
+                               (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, after, save_before);
+                       }
+               }
+       } else {
+               u64 ts;
+               /* SLOW PATH - Interrupted between A and C */
+               a_ok = rb_time_read(&cpu_buffer->write_stamp, &after);
+               /* Was interrupted before here, write_stamp must be valid */
+               RB_WARN_ON(cpu_buffer, !a_ok);
+               ts = rb_time_stamp(cpu_buffer->buffer);
+               barrier();
+ /*E*/         if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
+                   after < ts) {
+                       /* Nothing came after this event between C and E */
+                       info->delta = ts - after;
+                       (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, after, info->ts);
+                       info->ts = ts;
+               } else {
+                       /*
+                        * Interrupted beween C and E:
+                        * Lost the previous events time stamp. Just set the
+                        * delta to zero, and this will be the same time as
+                        * the event this event interrupted. And the events that
+                        * came after this will still be correct (as they would
+                        * have built their delta on the previous event.
+                        */
+                       info->delta = 0;
+               }
+               info->add_timestamp &= ~RB_ADD_STAMP_FORCE;
+       }
+
        /*
         * If this is the first commit on the page, then it has the same
         * timestamp as the page itself.
         */
-       if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer))
+       if (unlikely(!tail && !(info->add_timestamp &
+                               (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
                info->delta = 0;
 
-       /* See if we shot pass the end of this buffer page */
-       if (unlikely(write > BUF_PAGE_SIZE))
-               return rb_move_tail(cpu_buffer, tail, info);
-
        /* We reserved something on the buffer */
 
        event = __rb_page_index(tail_page, tail);
@@ -2944,9 +3277,9 @@ rb_reserve_next_event(struct trace_buffer *buffer,
        struct ring_buffer_event *event;
        struct rb_event_info info;
        int nr_loops = 0;
-       u64 diff;
 
        rb_start_commit(cpu_buffer);
+       /* The commit page can not change after this */
 
 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
        /*
@@ -2965,7 +3298,7 @@ rb_reserve_next_event(struct trace_buffer *buffer,
 
        info.length = rb_calculate_event_length(length);
  again:
-       info.add_timestamp = 0;
+       info.add_timestamp = RB_ADD_STAMP_NONE;
        info.delta = 0;
 
        /*
@@ -2980,22 +3313,6 @@ rb_reserve_next_event(struct trace_buffer *buffer,
        if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
                goto out_fail;
 
-       info.ts = rb_time_stamp(cpu_buffer->buffer);
-       diff = info.ts - cpu_buffer->write_stamp;
-
-       /* make sure this diff is calculated here */
-       barrier();
-
-       if (ring_buffer_time_stamp_abs(buffer)) {
-               info.delta = info.ts;
-               rb_handle_timestamp(cpu_buffer, &info);
-       } else /* Did the write stamp get updated already? */
-               if (likely(info.ts >= cpu_buffer->write_stamp)) {
-               info.delta = diff;
-               if (unlikely(test_time_stamp(info.delta)))
-                       rb_handle_timestamp(cpu_buffer, &info);
-       }
-
        event = __rb_reserve_next(cpu_buffer, &info);
 
        if (unlikely(PTR_ERR(event) == -EAGAIN)) {
@@ -3004,11 +3321,8 @@ rb_reserve_next_event(struct trace_buffer *buffer,
                goto again;
        }
 
-       if (!event)
-               goto out_fail;
-
-       return event;
-
+       if (likely(event))
+               return event;
  out_fail:
        rb_end_commit(cpu_buffer);
        return NULL;
@@ -3154,11 +3468,6 @@ void ring_buffer_discard_commit(struct trace_buffer *buffer,
        if (rb_try_to_discard(cpu_buffer, event))
                goto out;
 
-       /*
-        * The commit is still visible by the reader, so we
-        * must still update the timestamp.
-        */
-       rb_update_write_stamp(cpu_buffer, event);
  out:
        rb_end_commit(cpu_buffer);
 
@@ -4475,8 +4784,8 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
        cpu_buffer->read = 0;
        cpu_buffer->read_bytes = 0;
 
-       cpu_buffer->write_stamp = 0;
-       cpu_buffer->read_stamp = 0;
+       rb_time_set(&cpu_buffer->write_stamp, 0);
+       rb_time_set(&cpu_buffer->before_stamp, 0);
 
        cpu_buffer->lost_events = 0;
        cpu_buffer->last_overrun = 0;
@@ -4484,6 +4793,26 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
        rb_head_page_activate(cpu_buffer);
 }
 
+/* Must have disabled the cpu buffer then done a synchronize_rcu */
+static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       unsigned long flags;
+
+       raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+       if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
+               goto out;
+
+       arch_spin_lock(&cpu_buffer->lock);
+
+       rb_reset_cpu(cpu_buffer);
+
+       arch_spin_unlock(&cpu_buffer->lock);
+
+ out:
+       raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+}
+
 /**
  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
  * @buffer: The ring buffer to reset a per cpu buffer of
@@ -4492,7 +4821,6 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu)
 {
        struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
-       unsigned long flags;
 
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
                return;
@@ -4503,24 +4831,42 @@ void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu)
        /* Make sure all commits have finished */
        synchronize_rcu();
 
-       raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+       reset_disabled_cpu_buffer(cpu_buffer);
 
-       if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
-               goto out;
+       atomic_dec(&cpu_buffer->record_disabled);
+       atomic_dec(&cpu_buffer->resize_disabled);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
 
-       arch_spin_lock(&cpu_buffer->lock);
+/**
+ * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
+ * @buffer: The ring buffer to reset a per cpu buffer of
+ * @cpu: The CPU buffer to be reset
+ */
+void ring_buffer_reset_online_cpus(struct trace_buffer *buffer)
+{
+       struct ring_buffer_per_cpu *cpu_buffer;
+       int cpu;
 
-       rb_reset_cpu(cpu_buffer);
+       for_each_online_buffer_cpu(buffer, cpu) {
+               cpu_buffer = buffer->buffers[cpu];
 
-       arch_spin_unlock(&cpu_buffer->lock);
+               atomic_inc(&cpu_buffer->resize_disabled);
+               atomic_inc(&cpu_buffer->record_disabled);
+       }
 
- out:
-       raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+       /* Make sure all commits have finished */
+       synchronize_rcu();
 
-       atomic_dec(&cpu_buffer->record_disabled);
-       atomic_dec(&cpu_buffer->resize_disabled);
+       for_each_online_buffer_cpu(buffer, cpu) {
+               cpu_buffer = buffer->buffers[cpu];
+
+               reset_disabled_cpu_buffer(cpu_buffer);
+
+               atomic_dec(&cpu_buffer->record_disabled);
+               atomic_dec(&cpu_buffer->resize_disabled);
+       }
 }
-EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
 
 /**
  * ring_buffer_reset - reset a ring buffer
@@ -4528,10 +4874,27 @@ EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
  */
 void ring_buffer_reset(struct trace_buffer *buffer)
 {
+       struct ring_buffer_per_cpu *cpu_buffer;
        int cpu;
 
-       for_each_buffer_cpu(buffer, cpu)
-               ring_buffer_reset_cpu(buffer, cpu);
+       for_each_buffer_cpu(buffer, cpu) {
+               cpu_buffer = buffer->buffers[cpu];
+
+               atomic_inc(&cpu_buffer->resize_disabled);
+               atomic_inc(&cpu_buffer->record_disabled);
+       }
+
+       /* Make sure all commits have finished */
+       synchronize_rcu();
+
+       for_each_buffer_cpu(buffer, cpu) {
+               cpu_buffer = buffer->buffers[cpu];
+
+               reset_disabled_cpu_buffer(cpu_buffer);
+
+               atomic_dec(&cpu_buffer->record_disabled);
+               atomic_dec(&cpu_buffer->resize_disabled);
+       }
 }
 EXPORT_SYMBOL_GPL(ring_buffer_reset);