int refcnt; /* L: reference count */
int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
+ bool plugged; /* L: execution suspended */
/*
* nr_active management and WORK_STRUCT_INACTIVE:
goto out;
}
+ if (unlikely(pwq->plugged))
+ return false;
+
/*
* Unbound workqueue uses per-node shared nr_active $nna. If @pwq is
* already waiting on $nna, pwq_dec_nr_active() will maintain the
}
}
+/**
+ * unplug_oldest_pwq - restart an oldest plugged pool_workqueue
+ * @wq: workqueue_struct to be restarted
+ *
+ * pwq's are linked into wq->pwqs with the oldest first. For ordered
+ * workqueues, only the oldest pwq is unplugged, the others are plugged to
+ * suspend execution until the oldest one is drained. When this happens, the
+ * next oldest one (first plugged pwq in iteration) will be unplugged to
+ * restart work item execution to ensure proper work item ordering.
+ *
+ * dfl_pwq --------------+ [P] - plugged
+ * |
+ * v
+ * pwqs -> A -> B [P] -> C [P] (newest)
+ * | | |
+ * 1 3 5
+ * | | |
+ * 2 4 6
+ */
+static void unplug_oldest_pwq(struct workqueue_struct *wq)
+{
+ struct pool_workqueue *pwq;
+
+ lockdep_assert_held(&wq->mutex);
+
+ /* Caller should make sure that pwqs isn't empty before calling */
+ pwq = list_first_entry_or_null(&wq->pwqs, struct pool_workqueue,
+ pwqs_node);
+ raw_spin_lock_irq(&pwq->pool->lock);
+ if (pwq->plugged) {
+ pwq->plugged = false;
+ if (pwq_activate_first_inactive(pwq, true))
+ kick_pool(pwq->pool);
+ }
+ raw_spin_unlock_irq(&pwq->pool->lock);
+}
+
/**
* node_activate_pending_pwq - Activate a pending pwq on a wq_node_nr_active
* @nna: wq_node_nr_active to activate a pending pwq for
mutex_lock(&wq->mutex);
list_del_rcu(&pwq->pwqs_node);
is_last = list_empty(&wq->pwqs);
+
+ /*
+ * For ordered workqueue with a plugged dfl_pwq, restart it now.
+ */
+ if (!is_last && (wq->flags & __WQ_ORDERED))
+ unplug_oldest_pwq(wq);
+
mutex_unlock(&wq->mutex);
}
cpumask_copy(new_attrs->__pod_cpumask, new_attrs->cpumask);
ctx->attrs = new_attrs;
+ /*
+ * For initialized ordered workqueues, there should only be one pwq
+ * (dfl_pwq). Set the plugged flag of ctx->dfl_pwq to suspend execution
+ * of newly queued work items until execution of older work items in
+ * the old pwq's have completed.
+ */
+ if ((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs))
+ ctx->dfl_pwq->plugged = true;
+
ctx->wq = wq;
return ctx;
if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
return -EINVAL;
- /* creating multiple pwqs breaks ordering guarantee */
- if (!list_empty(&wq->pwqs) && WARN_ON(wq->flags & __WQ_ORDERED))
- return -EINVAL;
-
ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_cpumask);
if (IS_ERR(ctx))
return PTR_ERR(ctx);
list_for_each_entry(wq, &workqueues, list) {
if (!(wq->flags & WQ_UNBOUND) || (wq->flags & __WQ_DESTROYING))
continue;
- /* creating multiple pwqs breaks ordering guarantee */
- if (wq->flags & __WQ_ORDERED)
- continue;
ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs, unbound_cpumask);
if (IS_ERR(ctx)) {
int ret;
/*
- * Adjusting max_active or creating new pwqs by applying
- * attributes breaks ordering guarantee. Disallow exposing ordered
- * workqueues.
+ * Adjusting max_active breaks ordering guarantee. Disallow exposing
+ * ordered workqueues.
*/
if (WARN_ON(wq->flags & __WQ_ORDERED))
return -EINVAL;