Merge branch 'for-5.4/ish' into for-linus
[linux-2.6-microblaze.git] / kernel / bpf / cpumap.c
index 8ebd0fa..ef49e17 100644 (file)
 
 /* General idea: XDP packets getting XDP redirected to another CPU,
  * will maximum be stored/queued for one driver ->poll() call.  It is
- * guaranteed that setting flush bit and flush operation happen on
+ * guaranteed that queueing the frame and the flush operation happen on
  * same CPU.  Thus, cpu_map_flush operation can deduct via this_cpu_ptr()
  * which queue in bpf_cpu_map_entry contains packets.
  */
 
 #define CPU_MAP_BULK_SIZE 8  /* 8 == one cacheline on 64-bit archs */
+struct bpf_cpu_map_entry;
+struct bpf_cpu_map;
+
 struct xdp_bulk_queue {
        void *q[CPU_MAP_BULK_SIZE];
+       struct list_head flush_node;
+       struct bpf_cpu_map_entry *obj;
        unsigned int count;
 };
 
@@ -52,6 +57,8 @@ struct bpf_cpu_map_entry {
        /* XDP can run multiple RX-ring queues, need __percpu enqueue store */
        struct xdp_bulk_queue __percpu *bulkq;
 
+       struct bpf_cpu_map *cmap;
+
        /* Queue with potential multi-producers, and single-consumer kthread */
        struct ptr_ring *queue;
        struct task_struct *kthread;
@@ -65,23 +72,17 @@ struct bpf_cpu_map {
        struct bpf_map map;
        /* Below members specific for map type */
        struct bpf_cpu_map_entry **cpu_map;
-       unsigned long __percpu *flush_needed;
+       struct list_head __percpu *flush_list;
 };
 
-static int bq_flush_to_queue(struct bpf_cpu_map_entry *rcpu,
-                            struct xdp_bulk_queue *bq, bool in_napi_ctx);
-
-static u64 cpu_map_bitmap_size(const union bpf_attr *attr)
-{
-       return BITS_TO_LONGS(attr->max_entries) * sizeof(unsigned long);
-}
+static int bq_flush_to_queue(struct xdp_bulk_queue *bq, bool in_napi_ctx);
 
 static struct bpf_map *cpu_map_alloc(union bpf_attr *attr)
 {
        struct bpf_cpu_map *cmap;
        int err = -ENOMEM;
+       int ret, cpu;
        u64 cost;
-       int ret;
 
        if (!capable(CAP_SYS_ADMIN))
                return ERR_PTR(-EPERM);
@@ -105,23 +106,21 @@ static struct bpf_map *cpu_map_alloc(union bpf_attr *attr)
 
        /* make sure page count doesn't overflow */
        cost = (u64) cmap->map.max_entries * sizeof(struct bpf_cpu_map_entry *);
-       cost += cpu_map_bitmap_size(attr) * num_possible_cpus();
-       if (cost >= U32_MAX - PAGE_SIZE)
-               goto free_cmap;
-       cmap->map.pages = round_up(cost, PAGE_SIZE) >> PAGE_SHIFT;
+       cost += sizeof(struct list_head) * num_possible_cpus();
 
        /* Notice returns -EPERM on if map size is larger than memlock limit */
-       ret = bpf_map_precharge_memlock(cmap->map.pages);
+       ret = bpf_map_charge_init(&cmap->map.memory, cost);
        if (ret) {
                err = ret;
                goto free_cmap;
        }
 
-       /* A per cpu bitfield with a bit per possible CPU in map  */
-       cmap->flush_needed = __alloc_percpu(cpu_map_bitmap_size(attr),
-                                           __alignof__(unsigned long));
-       if (!cmap->flush_needed)
-               goto free_cmap;
+       cmap->flush_list = alloc_percpu(struct list_head);
+       if (!cmap->flush_list)
+               goto free_charge;
+
+       for_each_possible_cpu(cpu)
+               INIT_LIST_HEAD(per_cpu_ptr(cmap->flush_list, cpu));
 
        /* Alloc array for possible remote "destination" CPUs */
        cmap->cpu_map = bpf_map_area_alloc(cmap->map.max_entries *
@@ -132,7 +131,9 @@ static struct bpf_map *cpu_map_alloc(union bpf_attr *attr)
 
        return &cmap->map;
 free_percpu:
-       free_percpu(cmap->flush_needed);
+       free_percpu(cmap->flush_list);
+free_charge:
+       bpf_map_charge_finish(&cmap->map.memory);
 free_cmap:
        kfree(cmap);
        return ERR_PTR(err);
@@ -209,6 +210,9 @@ static struct sk_buff *cpu_map_build_skb(struct bpf_cpu_map_entry *rcpu,
         * - RX ring dev queue index    (skb_record_rx_queue)
         */
 
+       /* Until page_pool get SKB return path, release DMA here */
+       xdp_release_frame(xdpf);
+
        /* Allow SKB to reuse area used by xdp_frame */
        xdp_scrub_frame(xdpf);
 
@@ -332,7 +336,8 @@ static struct bpf_cpu_map_entry *__cpu_map_entry_alloc(u32 qsize, u32 cpu,
 {
        gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
        struct bpf_cpu_map_entry *rcpu;
-       int numa, err;
+       struct xdp_bulk_queue *bq;
+       int numa, err, i;
 
        /* Have map->numa_node, but choose node of redirect target CPU */
        numa = cpu_to_node(cpu);
@@ -347,6 +352,11 @@ static struct bpf_cpu_map_entry *__cpu_map_entry_alloc(u32 qsize, u32 cpu,
        if (!rcpu->bulkq)
                goto free_rcu;
 
+       for_each_possible_cpu(i) {
+               bq = per_cpu_ptr(rcpu->bulkq, i);
+               bq->obj = rcpu;
+       }
+
        /* Alloc queue */
        rcpu->queue = kzalloc_node(sizeof(*rcpu->queue), gfp, numa);
        if (!rcpu->queue)
@@ -403,7 +413,7 @@ static void __cpu_map_entry_free(struct rcu_head *rcu)
                struct xdp_bulk_queue *bq = per_cpu_ptr(rcpu->bulkq, cpu);
 
                /* No concurrent bq_enqueue can run at this point */
-               bq_flush_to_queue(rcpu, bq, false);
+               bq_flush_to_queue(bq, false);
        }
        free_percpu(rcpu->bulkq);
        /* Cannot kthread_stop() here, last put free rcpu resources */
@@ -486,6 +496,7 @@ static int cpu_map_update_elem(struct bpf_map *map, void *key, void *value,
                rcpu = __cpu_map_entry_alloc(qsize, key_cpu, map->id);
                if (!rcpu)
                        return -ENOMEM;
+               rcpu->cmap = cmap;
        }
        rcu_read_lock();
        __cpu_map_entry_replace(cmap, key_cpu, rcpu);
@@ -512,14 +523,14 @@ static void cpu_map_free(struct bpf_map *map)
        synchronize_rcu();
 
        /* To ensure all pending flush operations have completed wait for flush
-        * bitmap to indicate all flush_needed bits to be zero on _all_ cpus.
-        * Because the above synchronize_rcu() ensures the map is disconnected
-        * from the program we can assume no new bits will be set.
+        * list be empty on _all_ cpus. Because the above synchronize_rcu()
+        * ensures the map is disconnected from the program we can assume no new
+        * items will be added to the list.
         */
        for_each_online_cpu(cpu) {
-               unsigned long *bitmap = per_cpu_ptr(cmap->flush_needed, cpu);
+               struct list_head *flush_list = per_cpu_ptr(cmap->flush_list, cpu);
 
-               while (!bitmap_empty(bitmap, cmap->map.max_entries))
+               while (!list_empty(flush_list))
                        cond_resched();
        }
 
@@ -536,7 +547,7 @@ static void cpu_map_free(struct bpf_map *map)
                /* bq flush and cleanup happens after RCU graze-period */
                __cpu_map_entry_replace(cmap, i, NULL); /* call_rcu */
        }
-       free_percpu(cmap->flush_needed);
+       free_percpu(cmap->flush_list);
        bpf_map_area_free(cmap->cpu_map);
        kfree(cmap);
 }
@@ -588,9 +599,9 @@ const struct bpf_map_ops cpu_map_ops = {
        .map_check_btf          = map_check_no_btf,
 };
 
-static int bq_flush_to_queue(struct bpf_cpu_map_entry *rcpu,
-                            struct xdp_bulk_queue *bq, bool in_napi_ctx)
+static int bq_flush_to_queue(struct xdp_bulk_queue *bq, bool in_napi_ctx)
 {
+       struct bpf_cpu_map_entry *rcpu = bq->obj;
        unsigned int processed = 0, drops = 0;
        const int to_cpu = rcpu->cpu;
        struct ptr_ring *q;
@@ -619,6 +630,8 @@ static int bq_flush_to_queue(struct bpf_cpu_map_entry *rcpu,
        bq->count = 0;
        spin_unlock(&q->producer_lock);
 
+       __list_del_clearprev(&bq->flush_node);
+
        /* Feedback loop via tracepoints */
        trace_xdp_cpumap_enqueue(rcpu->map_id, processed, drops, to_cpu);
        return 0;
@@ -629,10 +642,11 @@ static int bq_flush_to_queue(struct bpf_cpu_map_entry *rcpu,
  */
 static int bq_enqueue(struct bpf_cpu_map_entry *rcpu, struct xdp_frame *xdpf)
 {
+       struct list_head *flush_list = this_cpu_ptr(rcpu->cmap->flush_list);
        struct xdp_bulk_queue *bq = this_cpu_ptr(rcpu->bulkq);
 
        if (unlikely(bq->count == CPU_MAP_BULK_SIZE))
-               bq_flush_to_queue(rcpu, bq, true);
+               bq_flush_to_queue(bq, true);
 
        /* Notice, xdp_buff/page MUST be queued here, long enough for
         * driver to code invoking us to finished, due to driver
@@ -644,6 +658,10 @@ static int bq_enqueue(struct bpf_cpu_map_entry *rcpu, struct xdp_frame *xdpf)
         * operation, when completing napi->poll call.
         */
        bq->q[bq->count++] = xdpf;
+
+       if (!bq->flush_node.prev)
+               list_add(&bq->flush_node, flush_list);
+
        return 0;
 }
 
@@ -663,41 +681,16 @@ int cpu_map_enqueue(struct bpf_cpu_map_entry *rcpu, struct xdp_buff *xdp,
        return 0;
 }
 
-void __cpu_map_insert_ctx(struct bpf_map *map, u32 bit)
-{
-       struct bpf_cpu_map *cmap = container_of(map, struct bpf_cpu_map, map);
-       unsigned long *bitmap = this_cpu_ptr(cmap->flush_needed);
-
-       __set_bit(bit, bitmap);
-}
-
 void __cpu_map_flush(struct bpf_map *map)
 {
        struct bpf_cpu_map *cmap = container_of(map, struct bpf_cpu_map, map);
-       unsigned long *bitmap = this_cpu_ptr(cmap->flush_needed);
-       u32 bit;
-
-       /* The napi->poll softirq makes sure __cpu_map_insert_ctx()
-        * and __cpu_map_flush() happen on same CPU. Thus, the percpu
-        * bitmap indicate which percpu bulkq have packets.
-        */
-       for_each_set_bit(bit, bitmap, map->max_entries) {
-               struct bpf_cpu_map_entry *rcpu = READ_ONCE(cmap->cpu_map[bit]);
-               struct xdp_bulk_queue *bq;
-
-               /* This is possible if entry is removed by user space
-                * between xdp redirect and flush op.
-                */
-               if (unlikely(!rcpu))
-                       continue;
-
-               __clear_bit(bit, bitmap);
+       struct list_head *flush_list = this_cpu_ptr(cmap->flush_list);
+       struct xdp_bulk_queue *bq, *tmp;
 
-               /* Flush all frames in bulkq to real queue */
-               bq = this_cpu_ptr(rcpu->bulkq);
-               bq_flush_to_queue(rcpu, bq, true);
+       list_for_each_entry_safe(bq, tmp, flush_list, flush_node) {
+               bq_flush_to_queue(bq, true);
 
                /* If already running, costs spin_lock_irqsave + smb_mb */
-               wake_up_process(rcpu->kthread);
+               wake_up_process(bq->obj->kthread);
        }
 }