Merge tag 'nfsd-5.15-1' of git://git.kernel.org/pub/scm/linux/kernel/git/cel/linux
[linux-2.6-microblaze.git] / net / sunrpc / xprtrdma / svc_rdma_pcl.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2020 Oracle. All rights reserved.
4  */
5
6 #include <linux/sunrpc/svc_rdma.h>
7 #include <linux/sunrpc/rpc_rdma.h>
8
9 #include "xprt_rdma.h"
10 #include <trace/events/rpcrdma.h>
11
12 /**
13  * pcl_free - Release all memory associated with a parsed chunk list
14  * @pcl: parsed chunk list
15  *
16  */
17 void pcl_free(struct svc_rdma_pcl *pcl)
18 {
19         while (!list_empty(&pcl->cl_chunks)) {
20                 struct svc_rdma_chunk *chunk;
21
22                 chunk = pcl_first_chunk(pcl);
23                 list_del(&chunk->ch_list);
24                 kfree(chunk);
25         }
26 }
27
28 static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position)
29 {
30         struct svc_rdma_chunk *chunk;
31
32         chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL);
33         if (!chunk)
34                 return NULL;
35
36         chunk->ch_position = position;
37         chunk->ch_length = 0;
38         chunk->ch_payload_length = 0;
39         chunk->ch_segcount = 0;
40         return chunk;
41 }
42
43 static struct svc_rdma_chunk *
44 pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position)
45 {
46         struct svc_rdma_chunk *pos;
47
48         pcl_for_each_chunk(pos, pcl) {
49                 if (pos->ch_position == position)
50                         return pos;
51         }
52         return NULL;
53 }
54
55 static void pcl_insert_position(struct svc_rdma_pcl *pcl,
56                                 struct svc_rdma_chunk *chunk)
57 {
58         struct svc_rdma_chunk *pos;
59
60         pcl_for_each_chunk(pos, pcl) {
61                 if (pos->ch_position > chunk->ch_position)
62                         break;
63         }
64         __list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list);
65         pcl->cl_count++;
66 }
67
68 static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt,
69                                  struct svc_rdma_chunk *chunk,
70                                  u32 handle, u32 length, u64 offset)
71 {
72         struct svc_rdma_segment *segment;
73
74         segment = &chunk->ch_segments[chunk->ch_segcount];
75         segment->rs_handle = handle;
76         segment->rs_length = length;
77         segment->rs_offset = offset;
78
79         trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment);
80
81         chunk->ch_length += length;
82         chunk->ch_segcount++;
83 }
84
85 /**
86  * pcl_alloc_call - Construct a parsed chunk list for the Call body
87  * @rctxt: Ingress receive context
88  * @p: Start of an un-decoded Read list
89  *
90  * Assumptions:
91  * - The incoming Read list has already been sanity checked.
92  * - cl_count is already set to the number of segments in
93  *   the un-decoded list.
94  * - The list might not be in order by position.
95  *
96  * Return values:
97  *       %true: Parsed chunk list was successfully constructed, and
98  *              cl_count is updated to be the number of chunks (ie.
99  *              unique positions) in the Read list.
100  *      %false: Memory allocation failed.
101  */
102 bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
103 {
104         struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl;
105         unsigned int i, segcount = pcl->cl_count;
106
107         pcl->cl_count = 0;
108         for (i = 0; i < segcount; i++) {
109                 struct svc_rdma_chunk *chunk;
110                 u32 position, handle, length;
111                 u64 offset;
112
113                 p++;    /* skip the list discriminator */
114                 p = xdr_decode_read_segment(p, &position, &handle,
115                                             &length, &offset);
116                 if (position != 0)
117                         continue;
118
119                 if (pcl_is_empty(pcl)) {
120                         chunk = pcl_alloc_chunk(segcount, position);
121                         if (!chunk)
122                                 return false;
123                         pcl_insert_position(pcl, chunk);
124                 } else {
125                         chunk = list_first_entry(&pcl->cl_chunks,
126                                                  struct svc_rdma_chunk,
127                                                  ch_list);
128                 }
129
130                 pcl_set_read_segment(rctxt, chunk, handle, length, offset);
131         }
132
133         return true;
134 }
135
136 /**
137  * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
138  * @rctxt: Ingress receive context
139  * @p: Start of an un-decoded Read list
140  *
141  * Assumptions:
142  * - The incoming Read list has already been sanity checked.
143  * - cl_count is already set to the number of segments in
144  *   the un-decoded list.
145  * - The list might not be in order by position.
146  *
147  * Return values:
148  *       %true: Parsed chunk list was successfully constructed, and
149  *              cl_count is updated to be the number of chunks (ie.
150  *              unique position values) in the Read list.
151  *      %false: Memory allocation failed.
152  *
153  * TODO:
154  * - Check for chunk range overlaps
155  */
156 bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
157 {
158         struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl;
159         unsigned int i, segcount = pcl->cl_count;
160
161         pcl->cl_count = 0;
162         for (i = 0; i < segcount; i++) {
163                 struct svc_rdma_chunk *chunk;
164                 u32 position, handle, length;
165                 u64 offset;
166
167                 p++;    /* skip the list discriminator */
168                 p = xdr_decode_read_segment(p, &position, &handle,
169                                             &length, &offset);
170                 if (position == 0)
171                         continue;
172
173                 chunk = pcl_lookup_position(pcl, position);
174                 if (!chunk) {
175                         chunk = pcl_alloc_chunk(segcount, position);
176                         if (!chunk)
177                                 return false;
178                         pcl_insert_position(pcl, chunk);
179                 }
180
181                 pcl_set_read_segment(rctxt, chunk, handle, length, offset);
182         }
183
184         return true;
185 }
186
187 /**
188  * pcl_alloc_write - Construct a parsed chunk list from a Write list
189  * @rctxt: Ingress receive context
190  * @pcl: Parsed chunk list to populate
191  * @p: Start of an un-decoded Write list
192  *
193  * Assumptions:
194  * - The incoming Write list has already been sanity checked, and
195  * - cl_count is set to the number of chunks in the un-decoded list.
196  *
197  * Return values:
198  *       %true: Parsed chunk list was successfully constructed.
199  *      %false: Memory allocation failed.
200  */
201 bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
202                      struct svc_rdma_pcl *pcl, __be32 *p)
203 {
204         struct svc_rdma_segment *segment;
205         struct svc_rdma_chunk *chunk;
206         unsigned int i, j;
207         u32 segcount;
208
209         for (i = 0; i < pcl->cl_count; i++) {
210                 p++;    /* skip the list discriminator */
211                 segcount = be32_to_cpup(p++);
212
213                 chunk = pcl_alloc_chunk(segcount, 0);
214                 if (!chunk)
215                         return false;
216                 list_add_tail(&chunk->ch_list, &pcl->cl_chunks);
217
218                 for (j = 0; j < segcount; j++) {
219                         segment = &chunk->ch_segments[j];
220                         p = xdr_decode_rdma_segment(p, &segment->rs_handle,
221                                                     &segment->rs_length,
222                                                     &segment->rs_offset);
223                         trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j);
224
225                         chunk->ch_length += segment->rs_length;
226                         chunk->ch_segcount++;
227                 }
228         }
229         return true;
230 }
231
232 static int pcl_process_region(const struct xdr_buf *xdr,
233                               unsigned int offset, unsigned int length,
234                               int (*actor)(const struct xdr_buf *, void *),
235                               void *data)
236 {
237         struct xdr_buf subbuf;
238
239         if (!length)
240                 return 0;
241         if (xdr_buf_subsegment(xdr, &subbuf, offset, length))
242                 return -EMSGSIZE;
243         return actor(&subbuf, data);
244 }
245
246 /**
247  * pcl_process_nonpayloads - Process non-payload regions inside @xdr
248  * @pcl: Chunk list to process
249  * @xdr: xdr_buf to process
250  * @actor: Function to invoke on each non-payload region
251  * @data: Arguments for @actor
252  *
253  * This mechanism must ignore not only result payloads that were already
254  * sent via RDMA Write, but also XDR padding for those payloads that
255  * the upper layer has added.
256  *
257  * Assumptions:
258  *  The xdr->len and ch_position fields are aligned to 4-byte multiples.
259  *
260  * Returns:
261  *   On success, zero,
262  *   %-EMSGSIZE on XDR buffer overflow, or
263  *   The return value of @actor
264  */
265 int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
266                             const struct xdr_buf *xdr,
267                             int (*actor)(const struct xdr_buf *, void *),
268                             void *data)
269 {
270         struct svc_rdma_chunk *chunk, *next;
271         unsigned int start;
272         int ret;
273
274         chunk = pcl_first_chunk(pcl);
275
276         /* No result payloads were generated */
277         if (!chunk || !chunk->ch_payload_length)
278                 return actor(xdr, data);
279
280         /* Process the region before the first result payload */
281         ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data);
282         if (ret < 0)
283                 return ret;
284
285         /* Process the regions between each middle result payload */
286         while ((next = pcl_next_chunk(pcl, chunk))) {
287                 if (!next->ch_payload_length)
288                         break;
289
290                 start = pcl_chunk_end_offset(chunk);
291                 ret = pcl_process_region(xdr, start, next->ch_position - start,
292                                          actor, data);
293                 if (ret < 0)
294                         return ret;
295
296                 chunk = next;
297         }
298
299         /* Process the region after the last result payload */
300         start = pcl_chunk_end_offset(chunk);
301         ret = pcl_process_region(xdr, start, xdr->len - start, actor, data);
302         if (ret < 0)
303                 return ret;
304
305         return 0;
306 }