* Optimistic spinning by Tim Chen <tim.c.chen@intel.com>
* and Davidlohr Bueso <davidlohr@hp.com>. Based on mutexes.
*
- * Rwsem count bit fields re-definition and rwsem rearchitecture
- * by Waiman Long <longman@redhat.com>.
+ * Rwsem count bit fields re-definition and rwsem rearchitecture by
+ * Waiman Long <longman@redhat.com> and
+ * Peter Zijlstra <peterz@infradead.org>.
*/
#include <linux/types.h>
*
* Bit 0 - writer locked bit
* Bit 1 - waiters present bit
- * Bits 2-7 - reserved
+ * Bit 2 - lock handoff bit
+ * Bits 3-7 - reserved
* Bits 8-X - 24-bit (32-bit) or 56-bit reader count
*
* atomic_long_fetch_add() is used to obtain reader lock, whereas
* atomic_long_cmpxchg() will be used to obtain writer lock.
+ *
+ * There are three places where the lock handoff bit may be set or cleared.
+ * 1) rwsem_mark_wake() for readers.
+ * 2) rwsem_try_write_lock() for writers.
+ * 3) Error path of rwsem_down_write_slowpath().
+ *
+ * For all the above cases, wait_lock will be held. A writer must also
+ * be the first one in the wait_list to be eligible for setting the handoff
+ * bit. So concurrent setting/clearing of handoff bit is not possible.
*/
#define RWSEM_WRITER_LOCKED (1UL << 0)
#define RWSEM_FLAG_WAITERS (1UL << 1)
+#define RWSEM_FLAG_HANDOFF (1UL << 2)
+
#define RWSEM_READER_SHIFT 8
#define RWSEM_READER_BIAS (1UL << RWSEM_READER_SHIFT)
#define RWSEM_READER_MASK (~(RWSEM_READER_BIAS - 1))
#define RWSEM_WRITER_MASK RWSEM_WRITER_LOCKED
#define RWSEM_LOCK_MASK (RWSEM_WRITER_MASK|RWSEM_READER_MASK)
-#define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS)
+#define RWSEM_READ_FAILED_MASK (RWSEM_WRITER_MASK|RWSEM_FLAG_WAITERS|\
+ RWSEM_FLAG_HANDOFF)
/*
* All writes to owner are protected by WRITE_ONCE() to make sure that
struct list_head list;
struct task_struct *task;
enum rwsem_waiter_type type;
+ unsigned long timeout;
};
+#define rwsem_first_waiter(sem) \
+ list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
enum rwsem_wake_type {
RWSEM_WAKE_ANY, /* Wake whatever's at head of wait list */
RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */
};
+enum writer_wait_state {
+ WRITER_NOT_FIRST, /* Writer is not first in wait list */
+ WRITER_FIRST, /* Writer is first in wait list */
+ WRITER_HANDOFF /* Writer is first & handoff needed */
+};
+
+/*
+ * The typical HZ value is either 250 or 1000. So set the minimum waiting
+ * time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait
+ * queue before initiating the handoff protocol.
+ */
+#define RWSEM_WAIT_TIMEOUT DIV_ROUND_UP(HZ, 250)
+
/*
* handle the lock release when processes blocked on it that can now run
* - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
long oldcount, woken = 0, adjustment = 0;
struct list_head wlist;
+ lockdep_assert_held(&sem->wait_lock);
+
/*
* Take a peek at the queue head waiter such that we can determine
* the wakeup(s) to perform.
*/
- waiter = list_first_entry(&sem->wait_list, struct rwsem_waiter, list);
+ waiter = rwsem_first_waiter(sem);
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
if (wake_type == RWSEM_WAKE_ANY) {
adjustment = RWSEM_READER_BIAS;
oldcount = atomic_long_fetch_add(adjustment, &sem->count);
if (unlikely(oldcount & RWSEM_WRITER_MASK)) {
- atomic_long_sub(adjustment, &sem->count);
+ /*
+ * When we've been waiting "too" long (for writers
+ * to give up the lock), request a HANDOFF to
+ * force the issue.
+ */
+ if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
+ time_after(jiffies, waiter->timeout)) {
+ adjustment -= RWSEM_FLAG_HANDOFF;
+ lockevent_inc(rwsem_rlock_handoff);
+ }
+
+ atomic_long_add(-adjustment, &sem->count);
return;
}
/*
adjustment -= RWSEM_FLAG_WAITERS;
}
+ /*
+ * When we've woken a reader, we no longer need to force writers
+ * to give up the lock and we can clear HANDOFF.
+ */
+ if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
+ adjustment -= RWSEM_FLAG_HANDOFF;
+
if (adjustment)
atomic_long_add(adjustment, &sem->count);
* This function must be called with the sem->wait_lock held to prevent
* race conditions between checking the rwsem wait list and setting the
* sem->count accordingly.
+ *
+ * If wstate is WRITER_HANDOFF, it will make sure that either the handoff
+ * bit is set or the lock is acquired with handoff bit cleared.
*/
-static inline bool rwsem_try_write_lock(long count, struct rw_semaphore *sem)
+static inline bool rwsem_try_write_lock(long count, struct rw_semaphore *sem,
+ enum writer_wait_state wstate)
{
long new;
- if (count & RWSEM_LOCK_MASK)
- return false;
+ lockdep_assert_held(&sem->wait_lock);
- new = count + RWSEM_WRITER_LOCKED -
- (list_is_singular(&sem->wait_list) ? RWSEM_FLAG_WAITERS : 0);
+ do {
+ bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
- if (atomic_long_try_cmpxchg_acquire(&sem->count, &count, new)) {
- rwsem_set_owner(sem);
- return true;
- }
+ if (has_handoff && wstate == WRITER_NOT_FIRST)
+ return false;
- return false;
+ new = count;
+
+ if (count & RWSEM_LOCK_MASK) {
+ if (has_handoff || (wstate != WRITER_HANDOFF))
+ return false;
+
+ new |= RWSEM_FLAG_HANDOFF;
+ } else {
+ new |= RWSEM_WRITER_LOCKED;
+ new &= ~RWSEM_FLAG_HANDOFF;
+
+ if (list_is_singular(&sem->wait_list))
+ new &= ~RWSEM_FLAG_WAITERS;
+ }
+ } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
+
+ /*
+ * We have either acquired the lock with handoff bit cleared or
+ * set the handoff bit.
+ */
+ if (new & RWSEM_FLAG_HANDOFF)
+ return false;
+
+ rwsem_set_owner(sem);
+ return true;
}
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
{
long count = atomic_long_read(&sem->count);
- while (!(count & RWSEM_LOCK_MASK)) {
+ while (!(count & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF))) {
if (atomic_long_try_cmpxchg_acquire(&sem->count, &count,
- count + RWSEM_WRITER_LOCKED)) {
+ count | RWSEM_WRITER_LOCKED)) {
rwsem_set_owner(sem);
lockevent_inc(rwsem_opt_wlock);
return true;
rcu_read_lock();
for (;;) {
+ if (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF) {
+ state = OWNER_NONSPINNABLE;
+ break;
+ }
+
tmp = READ_ONCE(sem->owner);
if (tmp != owner) {
state = rwsem_owner_state((unsigned long)tmp);
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_READ;
+ waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list)) {
/*
* In case the wait queue is empty and the lock isn't owned
- * by a writer, this reader can exit the slowpath and return
- * immediately as its RWSEM_READER_BIAS has already been
- * set in the count.
+ * by a writer or has the handoff bit set, this reader can
+ * exit the slowpath and return immediately as its
+ * RWSEM_READER_BIAS has already been set in the count.
*/
- if (!(atomic_long_read(&sem->count) & RWSEM_WRITER_MASK)) {
+ if (!(atomic_long_read(&sem->count) &
+ (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
raw_spin_unlock_irq(&sem->wait_lock);
rwsem_set_reader_owned(sem);
lockevent_inc(rwsem_rlock_fast);
return sem;
out_nolock:
list_del(&waiter.list);
- if (list_empty(&sem->wait_list))
- atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
+ if (list_empty(&sem->wait_list)) {
+ atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
+ &sem->count);
+ }
raw_spin_unlock_irq(&sem->wait_lock);
__set_current_state(TASK_RUNNING);
lockevent_inc(rwsem_rlock_fail);
rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
{
long count;
- bool waiting = true; /* any queued threads before us */
+ enum writer_wait_state wstate;
struct rwsem_waiter waiter;
struct rw_semaphore *ret = sem;
DEFINE_WAKE_Q(wake_q);
*/
waiter.task = current;
waiter.type = RWSEM_WAITING_FOR_WRITE;
+ waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
raw_spin_lock_irq(&sem->wait_lock);
/* account for this before adding a new element to the list */
- if (list_empty(&sem->wait_list))
- waiting = false;
+ wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
list_add_tail(&waiter.list, &sem->wait_list);
/* we're now waiting on the lock */
- if (waiting) {
+ if (wstate == WRITER_NOT_FIRST) {
count = atomic_long_read(&sem->count);
/*
- * If there were already threads queued before us and there are
- * no active writers and some readers, the lock must be read
- * owned; so we try to any read locks that were queued ahead
- * of us.
+ * If there were already threads queued before us and:
+ * 1) there are no no active locks, wake the front
+ * queued process(es) as the handoff bit might be set.
+ * 2) there are no active writers and some readers, the lock
+ * must be read owned; so we try to wake any read lock
+ * waiters that were queued ahead of us.
*/
- if (!(count & RWSEM_WRITER_MASK) &&
- (count & RWSEM_READER_MASK)) {
- rwsem_mark_wake(sem, RWSEM_WAKE_READERS, &wake_q);
- /*
- * The wakeup is normally called _after_ the wait_lock
- * is released, but given that we are proactively waking
- * readers we can deal with the wake_q overhead as it is
- * similar to releasing and taking the wait_lock again
- * for attempting rwsem_try_write_lock().
- */
- wake_up_q(&wake_q);
+ if (count & RWSEM_WRITER_MASK)
+ goto wait;
- /*
- * Reinitialize wake_q after use.
- */
- wake_q_init(&wake_q);
- }
+ rwsem_mark_wake(sem, (count & RWSEM_READER_MASK)
+ ? RWSEM_WAKE_READERS
+ : RWSEM_WAKE_ANY, &wake_q);
+ /*
+ * The wakeup is normally called _after_ the wait_lock
+ * is released, but given that we are proactively waking
+ * readers we can deal with the wake_q overhead as it is
+ * similar to releasing and taking the wait_lock again
+ * for attempting rwsem_try_write_lock().
+ */
+ wake_up_q(&wake_q);
+
+ /* We need wake_q again below, reinitialize */
+ wake_q_init(&wake_q);
} else {
count = atomic_long_add_return(RWSEM_FLAG_WAITERS, &sem->count);
}
+wait:
/* wait until we successfully acquire the lock */
set_current_state(state);
while (true) {
- if (rwsem_try_write_lock(count, sem))
+ if (rwsem_try_write_lock(count, sem, wstate))
break;
+
raw_spin_unlock_irq(&sem->wait_lock);
/* Block until there are no active lockers. */
- do {
+ for (;;) {
if (signal_pending_state(state, current))
goto out_nolock;
schedule();
lockevent_inc(rwsem_sleep_writer);
set_current_state(state);
+ /*
+ * If HANDOFF bit is set, unconditionally do
+ * a trylock.
+ */
+ if (wstate == WRITER_HANDOFF)
+ break;
+
+ if ((wstate == WRITER_NOT_FIRST) &&
+ (rwsem_first_waiter(sem) == &waiter))
+ wstate = WRITER_FIRST;
+
count = atomic_long_read(&sem->count);
- } while (count & RWSEM_LOCK_MASK);
+ if (!(count & RWSEM_LOCK_MASK))
+ break;
+
+ /*
+ * The setting of the handoff bit is deferred
+ * until rwsem_try_write_lock() is called.
+ */
+ if ((wstate == WRITER_FIRST) && (rt_task(current) ||
+ time_after(jiffies, waiter.timeout))) {
+ wstate = WRITER_HANDOFF;
+ lockevent_inc(rwsem_wlock_handoff);
+ break;
+ }
+ }
raw_spin_lock_irq(&sem->wait_lock);
+ count = atomic_long_read(&sem->count);
}
__set_current_state(TASK_RUNNING);
list_del(&waiter.list);
__set_current_state(TASK_RUNNING);
raw_spin_lock_irq(&sem->wait_lock);
list_del(&waiter.list);
+
+ if (unlikely(wstate == WRITER_HANDOFF))
+ atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count);
+
if (list_empty(&sem->wait_list))
atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
else
* handle waking up a waiter on the semaphore
* - up_read/up_write has decremented the active part of count if we come here
*/
-static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
+static struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem, long count)
{
unsigned long flags;
DEFINE_WAKE_Q(wake_q);
tmp = atomic_long_add_return_release(-RWSEM_READER_BIAS, &sem->count);
if (unlikely((tmp & (RWSEM_LOCK_MASK|RWSEM_FLAG_WAITERS)) ==
RWSEM_FLAG_WAITERS))
- rwsem_wake(sem);
+ rwsem_wake(sem, tmp);
}
/*
rwsem_clear_owner(sem);
tmp = atomic_long_fetch_add_release(-RWSEM_WRITER_LOCKED, &sem->count);
if (unlikely(tmp & RWSEM_FLAG_WAITERS))
- rwsem_wake(sem);
+ rwsem_wake(sem, tmp);
}
/*