}
EXPORT_SYMBOL(xsk_tx_peek_desc);
+static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, struct xdp_desc *descs,
+ u32 max_entries)
+{
+ u32 nb_pkts = 0;
+
+ while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts]))
+ nb_pkts++;
+
+ xsk_tx_release(pool);
+ return nb_pkts;
+}
+
+u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *descs,
+ u32 max_entries)
+{
+ struct xdp_sock *xs;
+ u32 nb_pkts;
+
+ rcu_read_lock();
+ if (!list_is_singular(&pool->xsk_tx_list)) {
+ /* Fallback to the non-batched version */
+ rcu_read_unlock();
+ return xsk_tx_peek_release_fallback(pool, descs, max_entries);
+ }
+
+ xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list);
+ if (!xs) {
+ nb_pkts = 0;
+ goto out;
+ }
+
+ nb_pkts = xskq_cons_peek_desc_batch(xs->tx, descs, pool, max_entries);
+ if (!nb_pkts) {
+ xs->tx->queue_empty_descs++;
+ goto out;
+ }
+
+ /* This is the backpressure mechanism for the Tx path. Try to
+ * reserve space in the completion queue for all packets, but
+ * if there are fewer slots available, just process that many
+ * packets. This avoids having to implement any buffering in
+ * the Tx path.
+ */
+ nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, descs, nb_pkts);
+ if (!nb_pkts)
+ goto out;
+
+ xskq_cons_release_n(xs->tx, nb_pkts);
+ __xskq_cons_release(xs->tx);
+ xs->sk.sk_write_space(&xs->sk);
+
+out:
+ rcu_read_unlock();
+ return nb_pkts;
+}
+EXPORT_SYMBOL(xsk_tx_peek_release_desc_batch);
+
static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
{
struct net_device *dev = xs->dev;
return false;
}
+static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
+ struct xdp_desc *descs,
+ struct xsk_buff_pool *pool, u32 max)
+{
+ u32 cached_cons = q->cached_cons, nb_entries = 0;
+
+ while (cached_cons != q->cached_prod && nb_entries < max) {
+ struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
+ u32 idx = cached_cons & q->ring_mask;
+
+ descs[nb_entries] = ring->desc[idx];
+ if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) {
+ /* Skip the entry */
+ cached_cons++;
+ continue;
+ }
+
+ nb_entries++;
+ cached_cons++;
+ }
+
+ return nb_entries;
+}
+
/* Functions for consumers */
static inline void __xskq_cons_release(struct xsk_queue *q)
__xskq_cons_peek(q);
}
-static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
+static inline u32 xskq_cons_nb_entries(struct xsk_queue *q, u32 max)
{
u32 entries = q->cached_prod - q->cached_cons;
- if (entries >= cnt)
- return true;
+ if (entries >= max)
+ return max;
__xskq_cons_peek(q);
entries = q->cached_prod - q->cached_cons;
- return entries >= cnt;
+ return entries >= max ? max : entries;
+}
+
+static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
+{
+ return xskq_cons_nb_entries(q, cnt) >= cnt ? true : false;
}
static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr)
return xskq_cons_read_desc(q, desc, pool);
}
+static inline u32 xskq_cons_peek_desc_batch(struct xsk_queue *q, struct xdp_desc *descs,
+ struct xsk_buff_pool *pool, u32 max)
+{
+ u32 entries = xskq_cons_nb_entries(q, max);
+
+ return xskq_cons_read_desc_batch(q, descs, pool, entries);
+}
+
+/* To improve performance in the xskq_cons_release functions, only update local state here.
+ * Reflect this to global state when we get new entries from the ring in
+ * xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop.
+ */
static inline void xskq_cons_release(struct xsk_queue *q)
{
- /* To improve performance, only update local state here.
- * Reflect this to global state when we get new entries
- * from the ring in xskq_cons_get_entries() and whenever
- * Rx or Tx processing are completed in the NAPI loop.
- */
q->cached_cons++;
}
+static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
+{
+ q->cached_cons += cnt;
+}
+
static inline bool xskq_cons_is_full(struct xsk_queue *q)
{
/* No barriers needed since data is not accessed */
/* Functions for producers */
-static inline bool xskq_prod_is_full(struct xsk_queue *q)
+static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
{
u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
- if (free_entries)
- return false;
+ if (free_entries >= max)
+ return max;
/* Refresh the local tail pointer */
q->cached_cons = READ_ONCE(q->ring->consumer);
free_entries = q->nentries - (q->cached_prod - q->cached_cons);
- return !free_entries;
+ return free_entries >= max ? max : free_entries;
+}
+
+static inline bool xskq_prod_is_full(struct xsk_queue *q)
+{
+ return xskq_prod_nb_free(q, 1) ? false : true;
}
static inline int xskq_prod_reserve(struct xsk_queue *q)
return 0;
}
+static inline u32 xskq_prod_reserve_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
+ u32 max)
+{
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+ u32 nb_entries, i, cached_prod;
+
+ nb_entries = xskq_prod_nb_free(q, max);
+
+ /* A, matches D */
+ cached_prod = q->cached_prod;
+ for (i = 0; i < nb_entries; i++)
+ ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr;
+ q->cached_prod = cached_prod;
+
+ return nb_entries;
+}
+
static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
u64 addr, u32 len)
{