ring-buffer: Have rb_iter_head_event() handle concurrent writer
authorSteven Rostedt (VMware) <rostedt@goodmis.org>
Tue, 17 Mar 2020 21:32:27 +0000 (17:32 -0400)
committerSteven Rostedt (VMware) <rostedt@goodmis.org>
Thu, 19 Mar 2020 23:11:19 +0000 (19:11 -0400)
Have the ring_buffer_iter structure have a place to store an event, such
that it can not be overwritten by a writer, and load it in such a way via
rb_iter_head_event() that it will return NULL and reset the iter to the
start of the current page if a writer updated the page.

Link: http://lkml.kernel.org/r/20200317213416.306959216@goodmis.org
Signed-off-by: Steven Rostedt (VMware) <rostedt@goodmis.org>
kernel/trace/ring_buffer.c

index e689bdc..3d718ad 100644 (file)
@@ -503,11 +503,13 @@ struct trace_buffer {
 struct ring_buffer_iter {
        struct ring_buffer_per_cpu      *cpu_buffer;
        unsigned long                   head;
+       unsigned long                   next_event;
        struct buffer_page              *head_page;
        struct buffer_page              *cache_reader_page;
        unsigned long                   cache_read;
        u64                             read_stamp;
        u64                             page_stamp;
+       struct ring_buffer_event        *event;
 };
 
 /**
@@ -1914,15 +1916,59 @@ rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
                               cpu_buffer->reader_page->read);
 }
 
-static __always_inline struct ring_buffer_event *
-rb_iter_head_event(struct ring_buffer_iter *iter)
+static __always_inline unsigned rb_page_commit(struct buffer_page *bpage)
 {
-       return __rb_page_index(iter->head_page, iter->head);
+       return local_read(&bpage->page->commit);
 }
 
-static __always_inline unsigned rb_page_commit(struct buffer_page *bpage)
+static struct ring_buffer_event *
+rb_iter_head_event(struct ring_buffer_iter *iter)
 {
-       return local_read(&bpage->page->commit);
+       struct ring_buffer_event *event;
+       struct buffer_page *iter_head_page = iter->head_page;
+       unsigned long commit;
+       unsigned length;
+
+       /*
+        * When the writer goes across pages, it issues a cmpxchg which
+        * is a mb(), which will synchronize with the rmb here.
+        * (see rb_tail_page_update() and __rb_reserve_next())
+        */
+       commit = rb_page_commit(iter_head_page);
+       smp_rmb();
+       event = __rb_page_index(iter_head_page, iter->head);
+       length = rb_event_length(event);
+
+       /*
+        * READ_ONCE() doesn't work on functions and we don't want the
+        * compiler doing any crazy optimizations with length.
+        */
+       barrier();
+
+       if ((iter->head + length) > commit || length > BUF_MAX_DATA_SIZE)
+               /* Writer corrupted the read? */
+               goto reset;
+
+       memcpy(iter->event, event, length);
+       /*
+        * If the page stamp is still the same after this rmb() then the
+        * event was safely copied without the writer entering the page.
+        */
+       smp_rmb();
+
+       /* Make sure the page didn't change since we read this */
+       if (iter->page_stamp != iter_head_page->page->time_stamp ||
+           commit > rb_page_commit(iter_head_page))
+               goto reset;
+
+       iter->next_event = iter->head + length;
+       return iter->event;
+ reset:
+       /* Reset to the beginning */
+       iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
+       iter->head = 0;
+       iter->next_event = 0;
+       return NULL;
 }
 
 /* Size is determined by what has been committed */
@@ -1962,6 +2008,7 @@ static void rb_inc_iter(struct ring_buffer_iter *iter)
 
        iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
        iter->head = 0;
+       iter->next_event = 0;
 }
 
 /*
@@ -3548,6 +3595,7 @@ static void rb_iter_reset(struct ring_buffer_iter *iter)
        /* Iterator usage is expected to have record disabled */
        iter->head_page = cpu_buffer->reader_page;
        iter->head = cpu_buffer->reader_page->read;
+       iter->next_event = iter->head;
 
        iter->cache_reader_page = iter->head_page;
        iter->cache_read = cpu_buffer->read;
@@ -3625,7 +3673,7 @@ int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
                return 0;
 
        /* Still racy, as it may return a false positive, but that's OK */
-       return ((iter->head_page == commit_page && iter->head == commit) ||
+       return ((iter->head_page == commit_page && iter->head >= commit) ||
                (iter->head_page == reader && commit_page == head_page &&
                 head_page->read == commit &&
                 iter->head == rb_page_commit(cpu_buffer->reader_page)));
@@ -3853,15 +3901,22 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
 static void rb_advance_iter(struct ring_buffer_iter *iter)
 {
        struct ring_buffer_per_cpu *cpu_buffer;
-       struct ring_buffer_event *event;
-       unsigned length;
 
        cpu_buffer = iter->cpu_buffer;
 
+       /* If head == next_event then we need to jump to the next event */
+       if (iter->head == iter->next_event) {
+               /* If the event gets overwritten again, there's nothing to do */
+               if (rb_iter_head_event(iter) == NULL)
+                       return;
+       }
+
+       iter->head = iter->next_event;
+
        /*
         * Check if we are at the end of the buffer.
         */
-       if (iter->head >= rb_page_size(iter->head_page)) {
+       if (iter->next_event >= rb_page_size(iter->head_page)) {
                /* discarded commits can make the page empty */
                if (iter->head_page == cpu_buffer->commit_page)
                        return;
@@ -3869,27 +3924,7 @@ static void rb_advance_iter(struct ring_buffer_iter *iter)
                return;
        }
 
-       event = rb_iter_head_event(iter);
-
-       length = rb_event_length(event);
-
-       /*
-        * This should not be called to advance the header if we are
-        * at the tail of the buffer.
-        */
-       if (RB_WARN_ON(cpu_buffer,
-                      (iter->head_page == cpu_buffer->commit_page) &&
-                      (iter->head + length > rb_commit_index(cpu_buffer))))
-               return;
-
-       rb_update_iter_read_stamp(iter, event);
-
-       iter->head += length;
-
-       /* check for end of page padding */
-       if ((iter->head >= rb_page_size(iter->head_page)) &&
-           (iter->head_page != cpu_buffer->commit_page))
-               rb_inc_iter(iter);
+       rb_update_iter_read_stamp(iter, iter->event);
 }
 
 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
@@ -4017,6 +4052,8 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
        }
 
        event = rb_iter_head_event(iter);
+       if (!event)
+               goto again;
 
        switch (event->type_len) {
        case RINGBUF_TYPE_PADDING:
@@ -4233,10 +4270,16 @@ ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
                return NULL;
 
-       iter = kmalloc(sizeof(*iter), flags);
+       iter = kzalloc(sizeof(*iter), flags);
        if (!iter)
                return NULL;
 
+       iter->event = kmalloc(BUF_MAX_DATA_SIZE, flags);
+       if (!iter->event) {
+               kfree(iter);
+               return NULL;
+       }
+
        cpu_buffer = buffer->buffers[cpu];
 
        iter->cpu_buffer = cpu_buffer;
@@ -4317,6 +4360,7 @@ ring_buffer_read_finish(struct ring_buffer_iter *iter)
 
        atomic_dec(&cpu_buffer->record_disabled);
        atomic_dec(&cpu_buffer->buffer->resize_disabled);
+       kfree(iter->event);
        kfree(iter);
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);