1 /* SPDX-License-Identifier: GPL-2.0 */
2 /* XDP user-space ring structure
3 * Copyright(c) 2018 Intel Corporation.
6 #ifndef _LINUX_XSK_QUEUE_H
7 #define _LINUX_XSK_QUEUE_H
9 #include <linux/types.h>
10 #include <linux/if_xdp.h>
11 #include <net/xdp_sock.h>
12 #include <net/xsk_buff_pool.h>
17 u32 producer ____cacheline_aligned_in_smp;
18 u32 consumer ____cacheline_aligned_in_smp;
22 /* Used for the RX and TX queues for packets */
23 struct xdp_rxtx_ring {
25 struct xdp_desc desc[] ____cacheline_aligned_in_smp;
28 /* Used for the fill and completion queues for buffers */
29 struct xdp_umem_ring {
31 u64 desc[] ____cacheline_aligned_in_smp;
39 struct xdp_ring *ring;
43 /* The structure of the shared state of the rings are the same as the
44 * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
45 * ring, the kernel is the producer and user space is the consumer. For
46 * the Tx and fill rings, the kernel is the consumer and user space is
51 * if (LOAD ->consumer) { LOAD ->producer
53 * STORE $data LOAD $data
54 * smp_wmb() (B) smp_mb() (D)
55 * STORE ->producer STORE ->consumer
58 * (A) pairs with (D), and (B) pairs with (C).
60 * Starting with (B), it protects the data from being written after
61 * the producer pointer. If this barrier was missing, the consumer
62 * could observe the producer pointer being set and thus load the data
63 * before the producer has written the new data. The consumer would in
64 * this case load the old data.
66 * (C) protects the consumer from speculatively loading the data before
67 * the producer pointer actually has been read. If we do not have this
68 * barrier, some architectures could load old data as speculative loads
69 * are not discarded as the CPU does not know there is a dependency
70 * between ->producer and data.
72 * (A) is a control dependency that separates the load of ->consumer
73 * from the stores of $data. In case ->consumer indicates there is no
74 * room in the buffer to store $data we do not. So no barrier is needed.
76 * (D) protects the load of the data to be observed to happen after the
77 * store of the consumer pointer. If we did not have this memory
78 * barrier, the producer could observe the consumer pointer being set
79 * and overwrite the data with a new value before the consumer got the
80 * chance to read the old value. The consumer would thus miss reading
81 * the old entry and very likely read the new entry twice, once right
82 * now and again after circling through the ring.
85 /* The operations on the rings are the following:
89 * RESERVE entries PEEK in the ring for entries
90 * WRITE data into the ring READ data from the ring
91 * SUBMIT entries RELEASE entries
93 * The producer reserves one or more entries in the ring. It can then
94 * fill in these entries and finally submit them so that they can be
95 * seen and read by the consumer.
97 * The consumer peeks into the ring to see if the producer has written
98 * any new entries. If so, the producer can then read these entries
99 * and when it is done reading them release them back to the producer
100 * so that the producer can use these slots to fill in new entries.
102 * The function names below reflect these operations.
105 /* Functions that read and validate content from consumer rings. */
107 static inline bool xskq_cons_read_addr_unchecked(struct xsk_queue *q, u64 *addr)
109 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
111 if (q->cached_cons != q->cached_prod) {
112 u32 idx = q->cached_cons & q->ring_mask;
114 *addr = ring->desc[idx];
121 static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
123 struct xdp_umem *umem)
125 if (!xp_validate_desc(umem->pool, d)) {
132 static inline bool xskq_cons_read_desc(struct xsk_queue *q,
133 struct xdp_desc *desc,
134 struct xdp_umem *umem)
136 while (q->cached_cons != q->cached_prod) {
137 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
138 u32 idx = q->cached_cons & q->ring_mask;
140 *desc = ring->desc[idx];
141 if (xskq_cons_is_valid_desc(q, desc, umem))
150 /* Functions for consumers */
152 static inline void __xskq_cons_release(struct xsk_queue *q)
154 smp_mb(); /* D, matches A */
155 WRITE_ONCE(q->ring->consumer, q->cached_cons);
158 static inline void __xskq_cons_peek(struct xsk_queue *q)
160 /* Refresh the local pointer */
161 q->cached_prod = READ_ONCE(q->ring->producer);
162 smp_rmb(); /* C, matches B */
165 static inline void xskq_cons_get_entries(struct xsk_queue *q)
167 __xskq_cons_release(q);
171 static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
173 u32 entries = q->cached_prod - q->cached_cons;
179 entries = q->cached_prod - q->cached_cons;
181 return entries >= cnt;
184 static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr)
186 if (q->cached_prod == q->cached_cons)
187 xskq_cons_get_entries(q);
188 return xskq_cons_read_addr_unchecked(q, addr);
191 static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
192 struct xdp_desc *desc,
193 struct xdp_umem *umem)
195 if (q->cached_prod == q->cached_cons)
196 xskq_cons_get_entries(q);
197 return xskq_cons_read_desc(q, desc, umem);
200 static inline void xskq_cons_release(struct xsk_queue *q)
202 /* To improve performance, only update local state here.
203 * Reflect this to global state when we get new entries
204 * from the ring in xskq_cons_get_entries() and whenever
205 * Rx or Tx processing are completed in the NAPI loop.
210 static inline bool xskq_cons_is_full(struct xsk_queue *q)
212 /* No barriers needed since data is not accessed */
213 return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
217 /* Functions for producers */
219 static inline bool xskq_prod_is_full(struct xsk_queue *q)
221 u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
226 /* Refresh the local tail pointer */
227 q->cached_cons = READ_ONCE(q->ring->consumer);
228 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
230 return !free_entries;
233 static inline int xskq_prod_reserve(struct xsk_queue *q)
235 if (xskq_prod_is_full(q))
243 static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
245 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
247 if (xskq_prod_is_full(q))
251 ring->desc[q->cached_prod++ & q->ring_mask] = addr;
255 static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
258 struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
261 if (xskq_prod_is_full(q))
265 idx = q->cached_prod++ & q->ring_mask;
266 ring->desc[idx].addr = addr;
267 ring->desc[idx].len = len;
272 static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
274 smp_wmb(); /* B, matches C */
276 WRITE_ONCE(q->ring->producer, idx);
279 static inline void xskq_prod_submit(struct xsk_queue *q)
281 __xskq_prod_submit(q, q->cached_prod);
284 static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
286 struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
287 u32 idx = q->ring->producer;
289 ring->desc[idx++ & q->ring_mask] = addr;
291 __xskq_prod_submit(q, idx);
294 static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
296 __xskq_prod_submit(q, q->ring->producer + nb_entries);
299 static inline bool xskq_prod_is_empty(struct xsk_queue *q)
301 /* No barriers needed since data is not accessed */
302 return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
305 /* For both producers and consumers */
307 static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
309 return q ? q->invalid_descs : 0;
312 struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
313 void xskq_destroy(struct xsk_queue *q_ops);
315 #endif /* _LINUX_XSK_QUEUE_H */