Merge tag 'for-linus-5.11-ofs1' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux-2.6-microblaze.git] / samples / bpf / xsk_fwd.c
1 // SPDX-License-Identifier: GPL-2.0
2 /* Copyright(c) 2020 Intel Corporation. */
3
4 #define _GNU_SOURCE
5 #include <poll.h>
6 #include <pthread.h>
7 #include <signal.h>
8 #include <sched.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/mman.h>
13 #include <sys/resource.h>
14 #include <sys/socket.h>
15 #include <sys/types.h>
16 #include <time.h>
17 #include <unistd.h>
18 #include <getopt.h>
19 #include <netinet/ether.h>
20 #include <net/if.h>
21
22 #include <linux/bpf.h>
23 #include <linux/if_link.h>
24 #include <linux/if_xdp.h>
25
26 #include <bpf/libbpf.h>
27 #include <bpf/xsk.h>
28 #include <bpf/bpf.h>
29
30 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
31
32 typedef __u64 u64;
33 typedef __u32 u32;
34 typedef __u16 u16;
35 typedef __u8  u8;
36
37 /* This program illustrates the packet forwarding between multiple AF_XDP
38  * sockets in multi-threaded environment. All threads are sharing a common
39  * buffer pool, with each socket having its own private buffer cache.
40  *
41  * Example 1: Single thread handling two sockets. The packets received by socket
42  * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue
43  * QB), while the packets received by socket B are forwarded to socket A. The
44  * thread is running on CPU core X:
45  *
46  *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X
47  *
48  * Example 2: Two threads, each handling two sockets. The thread running on CPU
49  * core X forwards all the packets received by socket A to socket B, and all the
50  * packets received by socket B to socket A. The thread running on CPU core Y is
51  * performing the same packet forwarding between sockets C and D:
52  *
53  *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD
54  *         -c CX -c CY
55  */
56
57 /*
58  * Buffer pool and buffer cache
59  *
60  * For packet forwarding, the packet buffers are typically allocated from the
61  * pool for packet reception and freed back to the pool for further reuse once
62  * the packet transmission is completed.
63  *
64  * The buffer pool is shared between multiple threads. In order to minimize the
65  * access latency to the shared buffer pool, each thread creates one (or
66  * several) buffer caches, which, unlike the buffer pool, are private to the
67  * thread that creates them and therefore cannot be shared with other threads.
68  * The access to the shared pool is only needed either (A) when the cache gets
69  * empty due to repeated buffer allocations and it needs to be replenished from
70  * the pool, or (B) when the cache gets full due to repeated buffer free and it
71  * needs to be flushed back to the pull.
72  *
73  * In a packet forwarding system, a packet received on any input port can
74  * potentially be transmitted on any output port, depending on the forwarding
75  * configuration. For AF_XDP sockets, for this to work with zero-copy of the
76  * packet buffers when, it is required that the buffer pool memory fits into the
77  * UMEM area shared by all the sockets.
78  */
79
80 struct bpool_params {
81         u32 n_buffers;
82         u32 buffer_size;
83         int mmap_flags;
84
85         u32 n_users_max;
86         u32 n_buffers_per_slab;
87 };
88
89 /* This buffer pool implementation organizes the buffers into equally sized
90  * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the
91  * pool that are completely filled with buffer pointers (full slabs).
92  *
93  * Each buffer cache has a slab for buffer allocation and a slab for buffer
94  * free, with both of these slabs initially empty. When the cache's allocation
95  * slab goes empty, it is swapped with one of the available full slabs from the
96  * pool, if any is available. When the cache's free slab goes full, it is
97  * swapped for one of the empty slabs from the pool, which is guaranteed to
98  * succeed.
99  *
100  * Partially filled slabs never get traded between the cache and the pool
101  * (except when the cache itself is destroyed), which enables fast operation
102  * through pointer swapping.
103  */
104 struct bpool {
105         struct bpool_params params;
106         pthread_mutex_t lock;
107         void *addr;
108
109         u64 **slabs;
110         u64 **slabs_reserved;
111         u64 *buffers;
112         u64 *buffers_reserved;
113
114         u64 n_slabs;
115         u64 n_slabs_reserved;
116         u64 n_buffers;
117
118         u64 n_slabs_available;
119         u64 n_slabs_reserved_available;
120
121         struct xsk_umem_config umem_cfg;
122         struct xsk_ring_prod umem_fq;
123         struct xsk_ring_cons umem_cq;
124         struct xsk_umem *umem;
125 };
126
127 static struct bpool *
128 bpool_init(struct bpool_params *params,
129            struct xsk_umem_config *umem_cfg)
130 {
131         struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
132         u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
133         u64 slabs_size, slabs_reserved_size;
134         u64 buffers_size, buffers_reserved_size;
135         u64 total_size, i;
136         struct bpool *bp;
137         u8 *p;
138         int status;
139
140         /* mmap prep. */
141         if (setrlimit(RLIMIT_MEMLOCK, &r))
142                 return NULL;
143
144         /* bpool internals dimensioning. */
145         n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) /
146                 params->n_buffers_per_slab;
147         n_slabs_reserved = params->n_users_max * 2;
148         n_buffers = n_slabs * params->n_buffers_per_slab;
149         n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
150
151         slabs_size = n_slabs * sizeof(u64 *);
152         slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
153         buffers_size = n_buffers * sizeof(u64);
154         buffers_reserved_size = n_buffers_reserved * sizeof(u64);
155
156         total_size = sizeof(struct bpool) +
157                 slabs_size + slabs_reserved_size +
158                 buffers_size + buffers_reserved_size;
159
160         /* bpool memory allocation. */
161         p = calloc(total_size, sizeof(u8));
162         if (!p)
163                 return NULL;
164
165         /* bpool memory initialization. */
166         bp = (struct bpool *)p;
167         memcpy(&bp->params, params, sizeof(*params));
168         bp->params.n_buffers = n_buffers;
169
170         bp->slabs = (u64 **)&p[sizeof(struct bpool)];
171         bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) +
172                 slabs_size];
173         bp->buffers = (u64 *)&p[sizeof(struct bpool) +
174                 slabs_size + slabs_reserved_size];
175         bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) +
176                 slabs_size + slabs_reserved_size + buffers_size];
177
178         bp->n_slabs = n_slabs;
179         bp->n_slabs_reserved = n_slabs_reserved;
180         bp->n_buffers = n_buffers;
181
182         for (i = 0; i < n_slabs; i++)
183                 bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
184         bp->n_slabs_available = n_slabs;
185
186         for (i = 0; i < n_slabs_reserved; i++)
187                 bp->slabs_reserved[i] = &bp->buffers_reserved[i *
188                         params->n_buffers_per_slab];
189         bp->n_slabs_reserved_available = n_slabs_reserved;
190
191         for (i = 0; i < n_buffers; i++)
192                 bp->buffers[i] = i * params->buffer_size;
193
194         /* lock. */
195         status = pthread_mutex_init(&bp->lock, NULL);
196         if (status) {
197                 free(p);
198                 return NULL;
199         }
200
201         /* mmap. */
202         bp->addr = mmap(NULL,
203                         n_buffers * params->buffer_size,
204                         PROT_READ | PROT_WRITE,
205                         MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags,
206                         -1,
207                         0);
208         if (bp->addr == MAP_FAILED) {
209                 pthread_mutex_destroy(&bp->lock);
210                 free(p);
211                 return NULL;
212         }
213
214         /* umem. */
215         status = xsk_umem__create(&bp->umem,
216                                   bp->addr,
217                                   bp->params.n_buffers * bp->params.buffer_size,
218                                   &bp->umem_fq,
219                                   &bp->umem_cq,
220                                   umem_cfg);
221         if (status) {
222                 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
223                 pthread_mutex_destroy(&bp->lock);
224                 free(p);
225                 return NULL;
226         }
227         memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));
228
229         return bp;
230 }
231
232 static void
233 bpool_free(struct bpool *bp)
234 {
235         if (!bp)
236                 return;
237
238         xsk_umem__delete(bp->umem);
239         munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
240         pthread_mutex_destroy(&bp->lock);
241         free(bp);
242 }
243
244 struct bcache {
245         struct bpool *bp;
246
247         u64 *slab_cons;
248         u64 *slab_prod;
249
250         u64 n_buffers_cons;
251         u64 n_buffers_prod;
252 };
253
254 static u32
255 bcache_slab_size(struct bcache *bc)
256 {
257         struct bpool *bp = bc->bp;
258
259         return bp->params.n_buffers_per_slab;
260 }
261
262 static struct bcache *
263 bcache_init(struct bpool *bp)
264 {
265         struct bcache *bc;
266
267         bc = calloc(1, sizeof(struct bcache));
268         if (!bc)
269                 return NULL;
270
271         bc->bp = bp;
272         bc->n_buffers_cons = 0;
273         bc->n_buffers_prod = 0;
274
275         pthread_mutex_lock(&bp->lock);
276         if (bp->n_slabs_reserved_available == 0) {
277                 pthread_mutex_unlock(&bp->lock);
278                 free(bc);
279                 return NULL;
280         }
281
282         bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
283         bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
284         bp->n_slabs_reserved_available -= 2;
285         pthread_mutex_unlock(&bp->lock);
286
287         return bc;
288 }
289
290 static void
291 bcache_free(struct bcache *bc)
292 {
293         struct bpool *bp;
294
295         if (!bc)
296                 return;
297
298         /* In order to keep this example simple, the case of freeing any
299          * existing buffers from the cache back to the pool is ignored.
300          */
301
302         bp = bc->bp;
303         pthread_mutex_lock(&bp->lock);
304         bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
305         bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
306         bp->n_slabs_reserved_available += 2;
307         pthread_mutex_unlock(&bp->lock);
308
309         free(bc);
310 }
311
312 /* To work correctly, the implementation requires that the *n_buffers* input
313  * argument is never greater than the buffer pool's *n_buffers_per_slab*. This
314  * is typically the case, with one exception taking place when large number of
315  * buffers are allocated at init time (e.g. for the UMEM fill queue setup).
316  */
317 static inline u32
318 bcache_cons_check(struct bcache *bc, u32 n_buffers)
319 {
320         struct bpool *bp = bc->bp;
321         u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
322         u64 n_buffers_cons = bc->n_buffers_cons;
323         u64 n_slabs_available;
324         u64 *slab_full;
325
326         /*
327          * Consumer slab is not empty: Use what's available locally. Do not
328          * look for more buffers from the pool when the ask can only be
329          * partially satisfied.
330          */
331         if (n_buffers_cons)
332                 return (n_buffers_cons < n_buffers) ?
333                         n_buffers_cons :
334                         n_buffers;
335
336         /*
337          * Consumer slab is empty: look to trade the current consumer slab
338          * (full) for a full slab from the pool, if any is available.
339          */
340         pthread_mutex_lock(&bp->lock);
341         n_slabs_available = bp->n_slabs_available;
342         if (!n_slabs_available) {
343                 pthread_mutex_unlock(&bp->lock);
344                 return 0;
345         }
346
347         n_slabs_available--;
348         slab_full = bp->slabs[n_slabs_available];
349         bp->slabs[n_slabs_available] = bc->slab_cons;
350         bp->n_slabs_available = n_slabs_available;
351         pthread_mutex_unlock(&bp->lock);
352
353         bc->slab_cons = slab_full;
354         bc->n_buffers_cons = n_buffers_per_slab;
355         return n_buffers;
356 }
357
358 static inline u64
359 bcache_cons(struct bcache *bc)
360 {
361         u64 n_buffers_cons = bc->n_buffers_cons - 1;
362         u64 buffer;
363
364         buffer = bc->slab_cons[n_buffers_cons];
365         bc->n_buffers_cons = n_buffers_cons;
366         return buffer;
367 }
368
369 static inline void
370 bcache_prod(struct bcache *bc, u64 buffer)
371 {
372         struct bpool *bp = bc->bp;
373         u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
374         u64 n_buffers_prod = bc->n_buffers_prod;
375         u64 n_slabs_available;
376         u64 *slab_empty;
377
378         /*
379          * Producer slab is not yet full: store the current buffer to it.
380          */
381         if (n_buffers_prod < n_buffers_per_slab) {
382                 bc->slab_prod[n_buffers_prod] = buffer;
383                 bc->n_buffers_prod = n_buffers_prod + 1;
384                 return;
385         }
386
387         /*
388          * Producer slab is full: trade the cache's current producer slab
389          * (full) for an empty slab from the pool, then store the current
390          * buffer to the new producer slab. As one full slab exists in the
391          * cache, it is guaranteed that there is at least one empty slab
392          * available in the pool.
393          */
394         pthread_mutex_lock(&bp->lock);
395         n_slabs_available = bp->n_slabs_available;
396         slab_empty = bp->slabs[n_slabs_available];
397         bp->slabs[n_slabs_available] = bc->slab_prod;
398         bp->n_slabs_available = n_slabs_available + 1;
399         pthread_mutex_unlock(&bp->lock);
400
401         slab_empty[0] = buffer;
402         bc->slab_prod = slab_empty;
403         bc->n_buffers_prod = 1;
404 }
405
406 /*
407  * Port
408  *
409  * Each of the forwarding ports sits on top of an AF_XDP socket. In order for
410  * packet forwarding to happen with no packet buffer copy, all the sockets need
411  * to share the same UMEM area, which is used as the buffer pool memory.
412  */
413 #ifndef MAX_BURST_RX
414 #define MAX_BURST_RX 64
415 #endif
416
417 #ifndef MAX_BURST_TX
418 #define MAX_BURST_TX 64
419 #endif
420
421 struct burst_rx {
422         u64 addr[MAX_BURST_RX];
423         u32 len[MAX_BURST_RX];
424 };
425
426 struct burst_tx {
427         u64 addr[MAX_BURST_TX];
428         u32 len[MAX_BURST_TX];
429         u32 n_pkts;
430 };
431
432 struct port_params {
433         struct xsk_socket_config xsk_cfg;
434         struct bpool *bp;
435         const char *iface;
436         u32 iface_queue;
437 };
438
439 struct port {
440         struct port_params params;
441
442         struct bcache *bc;
443
444         struct xsk_ring_cons rxq;
445         struct xsk_ring_prod txq;
446         struct xsk_ring_prod umem_fq;
447         struct xsk_ring_cons umem_cq;
448         struct xsk_socket *xsk;
449         int umem_fq_initialized;
450
451         u64 n_pkts_rx;
452         u64 n_pkts_tx;
453 };
454
455 static void
456 port_free(struct port *p)
457 {
458         if (!p)
459                 return;
460
461         /* To keep this example simple, the code to free the buffers from the
462          * socket's receive and transmit queues, as well as from the UMEM fill
463          * and completion queues, is not included.
464          */
465
466         if (p->xsk)
467                 xsk_socket__delete(p->xsk);
468
469         bcache_free(p->bc);
470
471         free(p);
472 }
473
474 static struct port *
475 port_init(struct port_params *params)
476 {
477         struct port *p;
478         u32 umem_fq_size, pos = 0;
479         int status, i;
480
481         /* Memory allocation and initialization. */
482         p = calloc(sizeof(struct port), 1);
483         if (!p)
484                 return NULL;
485
486         memcpy(&p->params, params, sizeof(p->params));
487         umem_fq_size = params->bp->umem_cfg.fill_size;
488
489         /* bcache. */
490         p->bc = bcache_init(params->bp);
491         if (!p->bc ||
492             (bcache_slab_size(p->bc) < umem_fq_size) ||
493             (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) {
494                 port_free(p);
495                 return NULL;
496         }
497
498         /* xsk socket. */
499         status = xsk_socket__create_shared(&p->xsk,
500                                            params->iface,
501                                            params->iface_queue,
502                                            params->bp->umem,
503                                            &p->rxq,
504                                            &p->txq,
505                                            &p->umem_fq,
506                                            &p->umem_cq,
507                                            &params->xsk_cfg);
508         if (status) {
509                 port_free(p);
510                 return NULL;
511         }
512
513         /* umem fq. */
514         xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
515
516         for (i = 0; i < umem_fq_size; i++)
517                 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
518                         bcache_cons(p->bc);
519
520         xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
521         p->umem_fq_initialized = 1;
522
523         return p;
524 }
525
526 static inline u32
527 port_rx_burst(struct port *p, struct burst_rx *b)
528 {
529         u32 n_pkts, pos, i;
530
531         /* Free buffers for FQ replenish. */
532         n_pkts = ARRAY_SIZE(b->addr);
533
534         n_pkts = bcache_cons_check(p->bc, n_pkts);
535         if (!n_pkts)
536                 return 0;
537
538         /* RXQ. */
539         n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
540         if (!n_pkts) {
541                 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
542                         struct pollfd pollfd = {
543                                 .fd = xsk_socket__fd(p->xsk),
544                                 .events = POLLIN,
545                         };
546
547                         poll(&pollfd, 1, 0);
548                 }
549                 return 0;
550         }
551
552         for (i = 0; i < n_pkts; i++) {
553                 b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
554                 b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
555         }
556
557         xsk_ring_cons__release(&p->rxq, n_pkts);
558         p->n_pkts_rx += n_pkts;
559
560         /* UMEM FQ. */
561         for ( ; ; ) {
562                 int status;
563
564                 status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
565                 if (status == n_pkts)
566                         break;
567
568                 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
569                         struct pollfd pollfd = {
570                                 .fd = xsk_socket__fd(p->xsk),
571                                 .events = POLLIN,
572                         };
573
574                         poll(&pollfd, 1, 0);
575                 }
576         }
577
578         for (i = 0; i < n_pkts; i++)
579                 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
580                         bcache_cons(p->bc);
581
582         xsk_ring_prod__submit(&p->umem_fq, n_pkts);
583
584         return n_pkts;
585 }
586
587 static inline void
588 port_tx_burst(struct port *p, struct burst_tx *b)
589 {
590         u32 n_pkts, pos, i;
591         int status;
592
593         /* UMEM CQ. */
594         n_pkts = p->params.bp->umem_cfg.comp_size;
595
596         n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
597
598         for (i = 0; i < n_pkts; i++) {
599                 u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
600
601                 bcache_prod(p->bc, addr);
602         }
603
604         xsk_ring_cons__release(&p->umem_cq, n_pkts);
605
606         /* TXQ. */
607         n_pkts = b->n_pkts;
608
609         for ( ; ; ) {
610                 status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
611                 if (status == n_pkts)
612                         break;
613
614                 if (xsk_ring_prod__needs_wakeup(&p->txq))
615                         sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT,
616                                NULL, 0);
617         }
618
619         for (i = 0; i < n_pkts; i++) {
620                 xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
621                 xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
622         }
623
624         xsk_ring_prod__submit(&p->txq, n_pkts);
625         if (xsk_ring_prod__needs_wakeup(&p->txq))
626                 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
627         p->n_pkts_tx += n_pkts;
628 }
629
630 /*
631  * Thread
632  *
633  * Packet forwarding threads.
634  */
635 #ifndef MAX_PORTS_PER_THREAD
636 #define MAX_PORTS_PER_THREAD 16
637 #endif
638
639 struct thread_data {
640         struct port *ports_rx[MAX_PORTS_PER_THREAD];
641         struct port *ports_tx[MAX_PORTS_PER_THREAD];
642         u32 n_ports_rx;
643         struct burst_rx burst_rx;
644         struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
645         u32 cpu_core_id;
646         int quit;
647 };
648
649 static void swap_mac_addresses(void *data)
650 {
651         struct ether_header *eth = (struct ether_header *)data;
652         struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost;
653         struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost;
654         struct ether_addr tmp;
655
656         tmp = *src_addr;
657         *src_addr = *dst_addr;
658         *dst_addr = tmp;
659 }
660
661 static void *
662 thread_func(void *arg)
663 {
664         struct thread_data *t = arg;
665         cpu_set_t cpu_cores;
666         u32 i;
667
668         CPU_ZERO(&cpu_cores);
669         CPU_SET(t->cpu_core_id, &cpu_cores);
670         pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores);
671
672         for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
673                 struct port *port_rx = t->ports_rx[i];
674                 struct port *port_tx = t->ports_tx[i];
675                 struct burst_rx *brx = &t->burst_rx;
676                 struct burst_tx *btx = &t->burst_tx[i];
677                 u32 n_pkts, j;
678
679                 /* RX. */
680                 n_pkts = port_rx_burst(port_rx, brx);
681                 if (!n_pkts)
682                         continue;
683
684                 /* Process & TX. */
685                 for (j = 0; j < n_pkts; j++) {
686                         u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
687                         u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr,
688                                                      addr);
689
690                         swap_mac_addresses(pkt);
691
692                         btx->addr[btx->n_pkts] = brx->addr[j];
693                         btx->len[btx->n_pkts] = brx->len[j];
694                         btx->n_pkts++;
695
696                         if (btx->n_pkts == MAX_BURST_TX) {
697                                 port_tx_burst(port_tx, btx);
698                                 btx->n_pkts = 0;
699                         }
700                 }
701         }
702
703         return NULL;
704 }
705
706 /*
707  * Process
708  */
709 static const struct bpool_params bpool_params_default = {
710         .n_buffers = 64 * 1024,
711         .buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
712         .mmap_flags = 0,
713
714         .n_users_max = 16,
715         .n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
716 };
717
718 static const struct xsk_umem_config umem_cfg_default = {
719         .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
720         .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
721         .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
722         .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
723         .flags = 0,
724 };
725
726 static const struct port_params port_params_default = {
727         .xsk_cfg = {
728                 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
729                 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
730                 .libbpf_flags = 0,
731                 .xdp_flags = XDP_FLAGS_DRV_MODE,
732                 .bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY,
733         },
734
735         .bp = NULL,
736         .iface = NULL,
737         .iface_queue = 0,
738 };
739
740 #ifndef MAX_PORTS
741 #define MAX_PORTS 64
742 #endif
743
744 #ifndef MAX_THREADS
745 #define MAX_THREADS 64
746 #endif
747
748 static struct bpool_params bpool_params;
749 static struct xsk_umem_config umem_cfg;
750 static struct bpool *bp;
751
752 static struct port_params port_params[MAX_PORTS];
753 static struct port *ports[MAX_PORTS];
754 static u64 n_pkts_rx[MAX_PORTS];
755 static u64 n_pkts_tx[MAX_PORTS];
756 static int n_ports;
757
758 static pthread_t threads[MAX_THREADS];
759 static struct thread_data thread_data[MAX_THREADS];
760 static int n_threads;
761
762 static void
763 print_usage(char *prog_name)
764 {
765         const char *usage =
766                 "Usage:\n"
767                 "\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
768                 "\n"
769                 "-c CORE        CPU core to run a packet forwarding thread\n"
770                 "               on. May be invoked multiple times.\n"
771                 "\n"
772                 "-b SIZE        Number of buffers in the buffer pool shared\n"
773                 "               by all the forwarding threads. Default: %u.\n"
774                 "\n"
775                 "-i INTERFACE   Network interface. Each (INTERFACE, QUEUE)\n"
776                 "               pair specifies one forwarding port. May be\n"
777                 "               invoked multiple times.\n"
778                 "\n"
779                 "-q QUEUE       Network interface queue for RX and TX. Each\n"
780                 "               (INTERFACE, QUEUE) pair specified one\n"
781                 "               forwarding port. Default: %u. May be invoked\n"
782                 "               multiple times.\n"
783                 "\n";
784         printf(usage,
785                prog_name,
786                bpool_params_default.n_buffers,
787                port_params_default.iface_queue);
788 }
789
790 static int
791 parse_args(int argc, char **argv)
792 {
793         struct option lgopts[] = {
794                 { NULL,  0, 0, 0 }
795         };
796         int opt, option_index;
797
798         /* Parse the input arguments. */
799         for ( ; ;) {
800                 opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
801                 if (opt == EOF)
802                         break;
803
804                 switch (opt) {
805                 case 'b':
806                         bpool_params.n_buffers = atoi(optarg);
807                         break;
808
809                 case 'c':
810                         if (n_threads == MAX_THREADS) {
811                                 printf("Max number of threads (%d) reached.\n",
812                                        MAX_THREADS);
813                                 return -1;
814                         }
815
816                         thread_data[n_threads].cpu_core_id = atoi(optarg);
817                         n_threads++;
818                         break;
819
820                 case 'i':
821                         if (n_ports == MAX_PORTS) {
822                                 printf("Max number of ports (%d) reached.\n",
823                                        MAX_PORTS);
824                                 return -1;
825                         }
826
827                         port_params[n_ports].iface = optarg;
828                         port_params[n_ports].iface_queue = 0;
829                         n_ports++;
830                         break;
831
832                 case 'q':
833                         if (n_ports == 0) {
834                                 printf("No port specified for queue.\n");
835                                 return -1;
836                         }
837                         port_params[n_ports - 1].iface_queue = atoi(optarg);
838                         break;
839
840                 default:
841                         printf("Illegal argument.\n");
842                         return -1;
843                 }
844         }
845
846         optind = 1; /* reset getopt lib */
847
848         /* Check the input arguments. */
849         if (!n_ports) {
850                 printf("No ports specified.\n");
851                 return -1;
852         }
853
854         if (!n_threads) {
855                 printf("No threads specified.\n");
856                 return -1;
857         }
858
859         if (n_ports % n_threads) {
860                 printf("Ports cannot be evenly distributed to threads.\n");
861                 return -1;
862         }
863
864         return 0;
865 }
866
867 static void
868 print_port(u32 port_id)
869 {
870         struct port *port = ports[port_id];
871
872         printf("Port %u: interface = %s, queue = %u\n",
873                port_id, port->params.iface, port->params.iface_queue);
874 }
875
876 static void
877 print_thread(u32 thread_id)
878 {
879         struct thread_data *t = &thread_data[thread_id];
880         u32 i;
881
882         printf("Thread %u (CPU core %u): ",
883                thread_id, t->cpu_core_id);
884
885         for (i = 0; i < t->n_ports_rx; i++) {
886                 struct port *port_rx = t->ports_rx[i];
887                 struct port *port_tx = t->ports_tx[i];
888
889                 printf("(%s, %u) -> (%s, %u), ",
890                        port_rx->params.iface,
891                        port_rx->params.iface_queue,
892                        port_tx->params.iface,
893                        port_tx->params.iface_queue);
894         }
895
896         printf("\n");
897 }
898
899 static void
900 print_port_stats_separator(void)
901 {
902         printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
903                "----",
904                "------------",
905                "-------------",
906                "------------",
907                "-------------");
908 }
909
910 static void
911 print_port_stats_header(void)
912 {
913         print_port_stats_separator();
914         printf("| %4s | %12s | %13s | %12s | %13s |\n",
915                "Port",
916                "RX packets",
917                "RX rate (pps)",
918                "TX packets",
919                "TX_rate (pps)");
920         print_port_stats_separator();
921 }
922
923 static void
924 print_port_stats_trailer(void)
925 {
926         print_port_stats_separator();
927         printf("\n");
928 }
929
930 static void
931 print_port_stats(int port_id, u64 ns_diff)
932 {
933         struct port *p = ports[port_id];
934         double rx_pps, tx_pps;
935
936         rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
937         tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;
938
939         printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
940                port_id,
941                p->n_pkts_rx,
942                rx_pps,
943                p->n_pkts_tx,
944                tx_pps);
945
946         n_pkts_rx[port_id] = p->n_pkts_rx;
947         n_pkts_tx[port_id] = p->n_pkts_tx;
948 }
949
950 static void
951 print_port_stats_all(u64 ns_diff)
952 {
953         int i;
954
955         print_port_stats_header();
956         for (i = 0; i < n_ports; i++)
957                 print_port_stats(i, ns_diff);
958         print_port_stats_trailer();
959 }
960
961 static int quit;
962
963 static void
964 signal_handler(int sig)
965 {
966         quit = 1;
967 }
968
969 static void remove_xdp_program(void)
970 {
971         int i;
972
973         for (i = 0 ; i < n_ports; i++)
974                 bpf_set_link_xdp_fd(if_nametoindex(port_params[i].iface), -1,
975                                     port_params[i].xsk_cfg.xdp_flags);
976 }
977
978 int main(int argc, char **argv)
979 {
980         struct timespec time;
981         u64 ns0;
982         int i;
983
984         /* Parse args. */
985         memcpy(&bpool_params, &bpool_params_default,
986                sizeof(struct bpool_params));
987         memcpy(&umem_cfg, &umem_cfg_default,
988                sizeof(struct xsk_umem_config));
989         for (i = 0; i < MAX_PORTS; i++)
990                 memcpy(&port_params[i], &port_params_default,
991                        sizeof(struct port_params));
992
993         if (parse_args(argc, argv)) {
994                 print_usage(argv[0]);
995                 return -1;
996         }
997
998         /* Buffer pool initialization. */
999         bp = bpool_init(&bpool_params, &umem_cfg);
1000         if (!bp) {
1001                 printf("Buffer pool initialization failed.\n");
1002                 return -1;
1003         }
1004         printf("Buffer pool created successfully.\n");
1005
1006         /* Ports initialization. */
1007         for (i = 0; i < MAX_PORTS; i++)
1008                 port_params[i].bp = bp;
1009
1010         for (i = 0; i < n_ports; i++) {
1011                 ports[i] = port_init(&port_params[i]);
1012                 if (!ports[i]) {
1013                         printf("Port %d initialization failed.\n", i);
1014                         return -1;
1015                 }
1016                 print_port(i);
1017         }
1018         printf("All ports created successfully.\n");
1019
1020         /* Threads. */
1021         for (i = 0; i < n_threads; i++) {
1022                 struct thread_data *t = &thread_data[i];
1023                 u32 n_ports_per_thread = n_ports / n_threads, j;
1024
1025                 for (j = 0; j < n_ports_per_thread; j++) {
1026                         t->ports_rx[j] = ports[i * n_ports_per_thread + j];
1027                         t->ports_tx[j] = ports[i * n_ports_per_thread +
1028                                 (j + 1) % n_ports_per_thread];
1029                 }
1030
1031                 t->n_ports_rx = n_ports_per_thread;
1032
1033                 print_thread(i);
1034         }
1035
1036         for (i = 0; i < n_threads; i++) {
1037                 int status;
1038
1039                 status = pthread_create(&threads[i],
1040                                         NULL,
1041                                         thread_func,
1042                                         &thread_data[i]);
1043                 if (status) {
1044                         printf("Thread %d creation failed.\n", i);
1045                         return -1;
1046                 }
1047         }
1048         printf("All threads created successfully.\n");
1049
1050         /* Print statistics. */
1051         signal(SIGINT, signal_handler);
1052         signal(SIGTERM, signal_handler);
1053         signal(SIGABRT, signal_handler);
1054
1055         clock_gettime(CLOCK_MONOTONIC, &time);
1056         ns0 = time.tv_sec * 1000000000UL + time.tv_nsec;
1057         for ( ; !quit; ) {
1058                 u64 ns1, ns_diff;
1059
1060                 sleep(1);
1061                 clock_gettime(CLOCK_MONOTONIC, &time);
1062                 ns1 = time.tv_sec * 1000000000UL + time.tv_nsec;
1063                 ns_diff = ns1 - ns0;
1064                 ns0 = ns1;
1065
1066                 print_port_stats_all(ns_diff);
1067         }
1068
1069         /* Threads completion. */
1070         printf("Quit.\n");
1071         for (i = 0; i < n_threads; i++)
1072                 thread_data[i].quit = 1;
1073
1074         for (i = 0; i < n_threads; i++)
1075                 pthread_join(threads[i], NULL);
1076
1077         for (i = 0; i < n_ports; i++)
1078                 port_free(ports[i]);
1079
1080         bpool_free(bp);
1081
1082         remove_xdp_program();
1083
1084         return 0;
1085 }