Merge tag 'xtensa-20210902' of git://github.com/jcmvbkbc/linux-xtensa
[linux-2.6-microblaze.git] / net / sunrpc / xprtrdma / svc_rdma_rw.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2016-2018 Oracle.  All rights reserved.
4  *
5  * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
6  */
7
8 #include <rdma/rw.h>
9
10 #include <linux/sunrpc/xdr.h>
11 #include <linux/sunrpc/rpc_rdma.h>
12 #include <linux/sunrpc/svc_rdma.h>
13
14 #include "xprt_rdma.h"
15 #include <trace/events/rpcrdma.h>
16
17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
19
20 /* Each R/W context contains state for one chain of RDMA Read or
21  * Write Work Requests.
22  *
23  * Each WR chain handles a single contiguous server-side buffer,
24  * because scatterlist entries after the first have to start on
25  * page alignment. xdr_buf iovecs cannot guarantee alignment.
26  *
27  * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
28  * from a client may contain a unique R_key, so each WR chain moves
29  * up to one segment at a time.
30  *
31  * The scatterlist makes this data structure over 4KB in size. To
32  * make it less likely to fail, and to handle the allocation for
33  * smaller I/O requests without disabling bottom-halves, these
34  * contexts are created on demand, but cached and reused until the
35  * controlling svcxprt_rdma is destroyed.
36  */
37 struct svc_rdma_rw_ctxt {
38         struct llist_node       rw_node;
39         struct list_head        rw_list;
40         struct rdma_rw_ctx      rw_ctx;
41         unsigned int            rw_nents;
42         struct sg_table         rw_sg_table;
43         struct scatterlist      rw_first_sgl[];
44 };
45
46 static inline struct svc_rdma_rw_ctxt *
47 svc_rdma_next_ctxt(struct list_head *list)
48 {
49         return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
50                                         rw_list);
51 }
52
53 static struct svc_rdma_rw_ctxt *
54 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
55 {
56         struct svc_rdma_rw_ctxt *ctxt;
57         struct llist_node *node;
58
59         spin_lock(&rdma->sc_rw_ctxt_lock);
60         node = llist_del_first(&rdma->sc_rw_ctxts);
61         spin_unlock(&rdma->sc_rw_ctxt_lock);
62         if (node) {
63                 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
64         } else {
65                 ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE),
66                                GFP_KERNEL);
67                 if (!ctxt)
68                         goto out_noctx;
69
70                 INIT_LIST_HEAD(&ctxt->rw_list);
71         }
72
73         ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
74         if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
75                                    ctxt->rw_sg_table.sgl,
76                                    SG_CHUNK_SIZE))
77                 goto out_free;
78         return ctxt;
79
80 out_free:
81         kfree(ctxt);
82 out_noctx:
83         trace_svcrdma_no_rwctx_err(rdma, sges);
84         return NULL;
85 }
86
87 static void __svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
88                                    struct svc_rdma_rw_ctxt *ctxt,
89                                    struct llist_head *list)
90 {
91         sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE);
92         llist_add(&ctxt->rw_node, list);
93 }
94
95 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
96                                  struct svc_rdma_rw_ctxt *ctxt)
97 {
98         __svc_rdma_put_rw_ctxt(rdma, ctxt, &rdma->sc_rw_ctxts);
99 }
100
101 /**
102  * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
103  * @rdma: transport about to be destroyed
104  *
105  */
106 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
107 {
108         struct svc_rdma_rw_ctxt *ctxt;
109         struct llist_node *node;
110
111         while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) {
112                 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
113                 kfree(ctxt);
114         }
115 }
116
117 /**
118  * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
119  * @rdma: controlling transport instance
120  * @ctxt: R/W context to prepare
121  * @offset: RDMA offset
122  * @handle: RDMA tag/handle
123  * @direction: I/O direction
124  *
125  * Returns on success, the number of WQEs that will be needed
126  * on the workqueue, or a negative errno.
127  */
128 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
129                                 struct svc_rdma_rw_ctxt *ctxt,
130                                 u64 offset, u32 handle,
131                                 enum dma_data_direction direction)
132 {
133         int ret;
134
135         ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num,
136                                ctxt->rw_sg_table.sgl, ctxt->rw_nents,
137                                0, offset, handle, direction);
138         if (unlikely(ret < 0)) {
139                 svc_rdma_put_rw_ctxt(rdma, ctxt);
140                 trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret);
141         }
142         return ret;
143 }
144
145 /* A chunk context tracks all I/O for moving one Read or Write
146  * chunk. This is a set of rdma_rw's that handle data movement
147  * for all segments of one chunk.
148  *
149  * These are small, acquired with a single allocator call, and
150  * no more than one is needed per chunk. They are allocated on
151  * demand, and not cached.
152  */
153 struct svc_rdma_chunk_ctxt {
154         struct rpc_rdma_cid     cc_cid;
155         struct ib_cqe           cc_cqe;
156         struct svcxprt_rdma     *cc_rdma;
157         struct list_head        cc_rwctxts;
158         int                     cc_sqecount;
159         enum ib_wc_status       cc_status;
160         struct completion       cc_done;
161 };
162
163 static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma,
164                                  struct rpc_rdma_cid *cid)
165 {
166         cid->ci_queue_id = rdma->sc_sq_cq->res.id;
167         cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
168 }
169
170 static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
171                              struct svc_rdma_chunk_ctxt *cc)
172 {
173         svc_rdma_cc_cid_init(rdma, &cc->cc_cid);
174         cc->cc_rdma = rdma;
175
176         INIT_LIST_HEAD(&cc->cc_rwctxts);
177         cc->cc_sqecount = 0;
178 }
179
180 /*
181  * The consumed rw_ctx's are cleaned and placed on a local llist so
182  * that only one atomic llist operation is needed to put them all
183  * back on the free list.
184  */
185 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc,
186                                 enum dma_data_direction dir)
187 {
188         struct svcxprt_rdma *rdma = cc->cc_rdma;
189         struct llist_node *first, *last;
190         struct svc_rdma_rw_ctxt *ctxt;
191         LLIST_HEAD(free);
192
193         first = last = NULL;
194         while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
195                 list_del(&ctxt->rw_list);
196
197                 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
198                                     rdma->sc_port_num, ctxt->rw_sg_table.sgl,
199                                     ctxt->rw_nents, dir);
200                 __svc_rdma_put_rw_ctxt(rdma, ctxt, &free);
201
202                 ctxt->rw_node.next = first;
203                 first = &ctxt->rw_node;
204                 if (!last)
205                         last = first;
206         }
207         if (first)
208                 llist_add_batch(first, last, &rdma->sc_rw_ctxts);
209 }
210
211 /* State for sending a Write or Reply chunk.
212  *  - Tracks progress of writing one chunk over all its segments
213  *  - Stores arguments for the SGL constructor functions
214  */
215 struct svc_rdma_write_info {
216         const struct svc_rdma_chunk     *wi_chunk;
217
218         /* write state of this chunk */
219         unsigned int            wi_seg_off;
220         unsigned int            wi_seg_no;
221
222         /* SGL constructor arguments */
223         const struct xdr_buf    *wi_xdr;
224         unsigned char           *wi_base;
225         unsigned int            wi_next_off;
226
227         struct svc_rdma_chunk_ctxt      wi_cc;
228 };
229
230 static struct svc_rdma_write_info *
231 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
232                           const struct svc_rdma_chunk *chunk)
233 {
234         struct svc_rdma_write_info *info;
235
236         info = kmalloc(sizeof(*info), GFP_KERNEL);
237         if (!info)
238                 return info;
239
240         info->wi_chunk = chunk;
241         info->wi_seg_off = 0;
242         info->wi_seg_no = 0;
243         svc_rdma_cc_init(rdma, &info->wi_cc);
244         info->wi_cc.cc_cqe.done = svc_rdma_write_done;
245         return info;
246 }
247
248 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
249 {
250         svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE);
251         kfree(info);
252 }
253
254 /**
255  * svc_rdma_write_done - Write chunk completion
256  * @cq: controlling Completion Queue
257  * @wc: Work Completion
258  *
259  * Pages under I/O are freed by a subsequent Send completion.
260  */
261 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
262 {
263         struct ib_cqe *cqe = wc->wr_cqe;
264         struct svc_rdma_chunk_ctxt *cc =
265                         container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
266         struct svcxprt_rdma *rdma = cc->cc_rdma;
267         struct svc_rdma_write_info *info =
268                         container_of(cc, struct svc_rdma_write_info, wi_cc);
269
270         trace_svcrdma_wc_write(wc, &cc->cc_cid);
271
272         svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
273
274         if (unlikely(wc->status != IB_WC_SUCCESS))
275                 svc_xprt_deferred_close(&rdma->sc_xprt);
276
277         svc_rdma_write_info_free(info);
278 }
279
280 /* State for pulling a Read chunk.
281  */
282 struct svc_rdma_read_info {
283         struct svc_rqst                 *ri_rqst;
284         struct svc_rdma_recv_ctxt       *ri_readctxt;
285         unsigned int                    ri_pageno;
286         unsigned int                    ri_pageoff;
287         unsigned int                    ri_totalbytes;
288
289         struct svc_rdma_chunk_ctxt      ri_cc;
290 };
291
292 static struct svc_rdma_read_info *
293 svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma)
294 {
295         struct svc_rdma_read_info *info;
296
297         info = kmalloc(sizeof(*info), GFP_KERNEL);
298         if (!info)
299                 return info;
300
301         svc_rdma_cc_init(rdma, &info->ri_cc);
302         info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done;
303         return info;
304 }
305
306 static void svc_rdma_read_info_free(struct svc_rdma_read_info *info)
307 {
308         svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE);
309         kfree(info);
310 }
311
312 /**
313  * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
314  * @cq: controlling Completion Queue
315  * @wc: Work Completion
316  *
317  */
318 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
319 {
320         struct ib_cqe *cqe = wc->wr_cqe;
321         struct svc_rdma_chunk_ctxt *cc =
322                         container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
323         struct svcxprt_rdma *rdma = cc->cc_rdma;
324
325         trace_svcrdma_wc_read(wc, &cc->cc_cid);
326
327         svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
328         cc->cc_status = wc->status;
329         complete(&cc->cc_done);
330         return;
331 }
332
333 /* This function sleeps when the transport's Send Queue is congested.
334  *
335  * Assumptions:
336  * - If ib_post_send() succeeds, only one completion is expected,
337  *   even if one or more WRs are flushed. This is true when posting
338  *   an rdma_rw_ctx or when posting a single signaled WR.
339  */
340 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
341 {
342         struct svcxprt_rdma *rdma = cc->cc_rdma;
343         struct ib_send_wr *first_wr;
344         const struct ib_send_wr *bad_wr;
345         struct list_head *tmp;
346         struct ib_cqe *cqe;
347         int ret;
348
349         if (cc->cc_sqecount > rdma->sc_sq_depth)
350                 return -EINVAL;
351
352         first_wr = NULL;
353         cqe = &cc->cc_cqe;
354         list_for_each(tmp, &cc->cc_rwctxts) {
355                 struct svc_rdma_rw_ctxt *ctxt;
356
357                 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
358                 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
359                                            rdma->sc_port_num, cqe, first_wr);
360                 cqe = NULL;
361         }
362
363         do {
364                 if (atomic_sub_return(cc->cc_sqecount,
365                                       &rdma->sc_sq_avail) > 0) {
366                         ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
367                         if (ret)
368                                 break;
369                         return 0;
370                 }
371
372                 percpu_counter_inc(&svcrdma_stat_sq_starve);
373                 trace_svcrdma_sq_full(rdma);
374                 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
375                 wait_event(rdma->sc_send_wait,
376                            atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
377                 trace_svcrdma_sq_retry(rdma);
378         } while (1);
379
380         trace_svcrdma_sq_post_err(rdma, ret);
381         svc_xprt_deferred_close(&rdma->sc_xprt);
382
383         /* If even one was posted, there will be a completion. */
384         if (bad_wr != first_wr)
385                 return 0;
386
387         atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
388         wake_up(&rdma->sc_send_wait);
389         return -ENOTCONN;
390 }
391
392 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
393  */
394 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
395                                unsigned int len,
396                                struct svc_rdma_rw_ctxt *ctxt)
397 {
398         struct scatterlist *sg = ctxt->rw_sg_table.sgl;
399
400         sg_set_buf(&sg[0], info->wi_base, len);
401         info->wi_base += len;
402
403         ctxt->rw_nents = 1;
404 }
405
406 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
407  */
408 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
409                                     unsigned int remaining,
410                                     struct svc_rdma_rw_ctxt *ctxt)
411 {
412         unsigned int sge_no, sge_bytes, page_off, page_no;
413         const struct xdr_buf *xdr = info->wi_xdr;
414         struct scatterlist *sg;
415         struct page **page;
416
417         page_off = info->wi_next_off + xdr->page_base;
418         page_no = page_off >> PAGE_SHIFT;
419         page_off = offset_in_page(page_off);
420         page = xdr->pages + page_no;
421         info->wi_next_off += remaining;
422         sg = ctxt->rw_sg_table.sgl;
423         sge_no = 0;
424         do {
425                 sge_bytes = min_t(unsigned int, remaining,
426                                   PAGE_SIZE - page_off);
427                 sg_set_page(sg, *page, sge_bytes, page_off);
428
429                 remaining -= sge_bytes;
430                 sg = sg_next(sg);
431                 page_off = 0;
432                 sge_no++;
433                 page++;
434         } while (remaining);
435
436         ctxt->rw_nents = sge_no;
437 }
438
439 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
440  * an RPC Reply.
441  */
442 static int
443 svc_rdma_build_writes(struct svc_rdma_write_info *info,
444                       void (*constructor)(struct svc_rdma_write_info *info,
445                                           unsigned int len,
446                                           struct svc_rdma_rw_ctxt *ctxt),
447                       unsigned int remaining)
448 {
449         struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
450         struct svcxprt_rdma *rdma = cc->cc_rdma;
451         const struct svc_rdma_segment *seg;
452         struct svc_rdma_rw_ctxt *ctxt;
453         int ret;
454
455         do {
456                 unsigned int write_len;
457                 u64 offset;
458
459                 seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
460                 if (!seg)
461                         goto out_overflow;
462
463                 write_len = min(remaining, seg->rs_length - info->wi_seg_off);
464                 if (!write_len)
465                         goto out_overflow;
466                 ctxt = svc_rdma_get_rw_ctxt(rdma,
467                                             (write_len >> PAGE_SHIFT) + 2);
468                 if (!ctxt)
469                         return -ENOMEM;
470
471                 constructor(info, write_len, ctxt);
472                 offset = seg->rs_offset + info->wi_seg_off;
473                 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
474                                            DMA_TO_DEVICE);
475                 if (ret < 0)
476                         return -EIO;
477                 percpu_counter_inc(&svcrdma_stat_write);
478
479                 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
480                 cc->cc_sqecount += ret;
481                 if (write_len == seg->rs_length - info->wi_seg_off) {
482                         info->wi_seg_no++;
483                         info->wi_seg_off = 0;
484                 } else {
485                         info->wi_seg_off += write_len;
486                 }
487                 remaining -= write_len;
488         } while (remaining);
489
490         return 0;
491
492 out_overflow:
493         trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no,
494                                      info->wi_chunk->ch_segcount);
495         return -E2BIG;
496 }
497
498 /**
499  * svc_rdma_iov_write - Construct RDMA Writes from an iov
500  * @info: pointer to write arguments
501  * @iov: kvec to write
502  *
503  * Returns:
504  *   On success, returns zero
505  *   %-E2BIG if the client-provided Write chunk is too small
506  *   %-ENOMEM if a resource has been exhausted
507  *   %-EIO if an rdma-rw error occurred
508  */
509 static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
510                               const struct kvec *iov)
511 {
512         info->wi_base = iov->iov_base;
513         return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
514                                      iov->iov_len);
515 }
516
517 /**
518  * svc_rdma_pages_write - Construct RDMA Writes from pages
519  * @info: pointer to write arguments
520  * @xdr: xdr_buf with pages to write
521  * @offset: offset into the content of @xdr
522  * @length: number of bytes to write
523  *
524  * Returns:
525  *   On success, returns zero
526  *   %-E2BIG if the client-provided Write chunk is too small
527  *   %-ENOMEM if a resource has been exhausted
528  *   %-EIO if an rdma-rw error occurred
529  */
530 static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
531                                 const struct xdr_buf *xdr,
532                                 unsigned int offset,
533                                 unsigned long length)
534 {
535         info->wi_xdr = xdr;
536         info->wi_next_off = offset - xdr->head[0].iov_len;
537         return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
538                                      length);
539 }
540
541 /**
542  * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
543  * @xdr: xdr_buf to write
544  * @data: pointer to write arguments
545  *
546  * Returns:
547  *   On success, returns zero
548  *   %-E2BIG if the client-provided Write chunk is too small
549  *   %-ENOMEM if a resource has been exhausted
550  *   %-EIO if an rdma-rw error occurred
551  */
552 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
553 {
554         struct svc_rdma_write_info *info = data;
555         int ret;
556
557         if (xdr->head[0].iov_len) {
558                 ret = svc_rdma_iov_write(info, &xdr->head[0]);
559                 if (ret < 0)
560                         return ret;
561         }
562
563         if (xdr->page_len) {
564                 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
565                                            xdr->page_len);
566                 if (ret < 0)
567                         return ret;
568         }
569
570         if (xdr->tail[0].iov_len) {
571                 ret = svc_rdma_iov_write(info, &xdr->tail[0]);
572                 if (ret < 0)
573                         return ret;
574         }
575
576         return xdr->len;
577 }
578
579 /**
580  * svc_rdma_send_write_chunk - Write all segments in a Write chunk
581  * @rdma: controlling RDMA transport
582  * @chunk: Write chunk provided by the client
583  * @xdr: xdr_buf containing the data payload
584  *
585  * Returns a non-negative number of bytes the chunk consumed, or
586  *      %-E2BIG if the payload was larger than the Write chunk,
587  *      %-EINVAL if client provided too many segments,
588  *      %-ENOMEM if rdma_rw context pool was exhausted,
589  *      %-ENOTCONN if posting failed (connection is lost),
590  *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
591  */
592 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
593                               const struct svc_rdma_chunk *chunk,
594                               const struct xdr_buf *xdr)
595 {
596         struct svc_rdma_write_info *info;
597         struct svc_rdma_chunk_ctxt *cc;
598         int ret;
599
600         info = svc_rdma_write_info_alloc(rdma, chunk);
601         if (!info)
602                 return -ENOMEM;
603         cc = &info->wi_cc;
604
605         ret = svc_rdma_xb_write(xdr, info);
606         if (ret != xdr->len)
607                 goto out_err;
608
609         trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
610         ret = svc_rdma_post_chunk_ctxt(cc);
611         if (ret < 0)
612                 goto out_err;
613         return xdr->len;
614
615 out_err:
616         svc_rdma_write_info_free(info);
617         return ret;
618 }
619
620 /**
621  * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
622  * @rdma: controlling RDMA transport
623  * @rctxt: Write and Reply chunks from client
624  * @xdr: xdr_buf containing an RPC Reply
625  *
626  * Returns a non-negative number of bytes the chunk consumed, or
627  *      %-E2BIG if the payload was larger than the Reply chunk,
628  *      %-EINVAL if client provided too many segments,
629  *      %-ENOMEM if rdma_rw context pool was exhausted,
630  *      %-ENOTCONN if posting failed (connection is lost),
631  *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
632  */
633 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
634                               const struct svc_rdma_recv_ctxt *rctxt,
635                               const struct xdr_buf *xdr)
636 {
637         struct svc_rdma_write_info *info;
638         struct svc_rdma_chunk_ctxt *cc;
639         struct svc_rdma_chunk *chunk;
640         int ret;
641
642         if (pcl_is_empty(&rctxt->rc_reply_pcl))
643                 return 0;
644
645         chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
646         info = svc_rdma_write_info_alloc(rdma, chunk);
647         if (!info)
648                 return -ENOMEM;
649         cc = &info->wi_cc;
650
651         ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
652                                       svc_rdma_xb_write, info);
653         if (ret < 0)
654                 goto out_err;
655
656         trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
657         ret = svc_rdma_post_chunk_ctxt(cc);
658         if (ret < 0)
659                 goto out_err;
660
661         return xdr->len;
662
663 out_err:
664         svc_rdma_write_info_free(info);
665         return ret;
666 }
667
668 /**
669  * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
670  * @info: context for ongoing I/O
671  * @segment: co-ordinates of remote memory to be read
672  *
673  * Returns:
674  *   %0: the Read WR chain was constructed successfully
675  *   %-EINVAL: there were not enough rq_pages to finish
676  *   %-ENOMEM: allocating a local resources failed
677  *   %-EIO: a DMA mapping error occurred
678  */
679 static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
680                                        const struct svc_rdma_segment *segment)
681 {
682         struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
683         struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
684         struct svc_rqst *rqstp = info->ri_rqst;
685         unsigned int sge_no, seg_len, len;
686         struct svc_rdma_rw_ctxt *ctxt;
687         struct scatterlist *sg;
688         int ret;
689
690         len = segment->rs_length;
691         sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
692         ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
693         if (!ctxt)
694                 return -ENOMEM;
695         ctxt->rw_nents = sge_no;
696
697         sg = ctxt->rw_sg_table.sgl;
698         for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
699                 seg_len = min_t(unsigned int, len,
700                                 PAGE_SIZE - info->ri_pageoff);
701
702                 if (!info->ri_pageoff)
703                         head->rc_page_count++;
704
705                 sg_set_page(sg, rqstp->rq_pages[info->ri_pageno],
706                             seg_len, info->ri_pageoff);
707                 sg = sg_next(sg);
708
709                 info->ri_pageoff += seg_len;
710                 if (info->ri_pageoff == PAGE_SIZE) {
711                         info->ri_pageno++;
712                         info->ri_pageoff = 0;
713                 }
714                 len -= seg_len;
715
716                 /* Safety check */
717                 if (len &&
718                     &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end)
719                         goto out_overrun;
720         }
721
722         ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset,
723                                    segment->rs_handle, DMA_FROM_DEVICE);
724         if (ret < 0)
725                 return -EIO;
726         percpu_counter_inc(&svcrdma_stat_read);
727
728         list_add(&ctxt->rw_list, &cc->cc_rwctxts);
729         cc->cc_sqecount += ret;
730         return 0;
731
732 out_overrun:
733         trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno);
734         return -EINVAL;
735 }
736
737 /**
738  * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
739  * @info: context for ongoing I/O
740  * @chunk: Read chunk to pull
741  *
742  * Return values:
743  *   %0: the Read WR chain was constructed successfully
744  *   %-EINVAL: there were not enough resources to finish
745  *   %-ENOMEM: allocating a local resources failed
746  *   %-EIO: a DMA mapping error occurred
747  */
748 static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
749                                      const struct svc_rdma_chunk *chunk)
750 {
751         const struct svc_rdma_segment *segment;
752         int ret;
753
754         ret = -EINVAL;
755         pcl_for_each_segment(segment, chunk) {
756                 ret = svc_rdma_build_read_segment(info, segment);
757                 if (ret < 0)
758                         break;
759                 info->ri_totalbytes += segment->rs_length;
760         }
761         return ret;
762 }
763
764 /**
765  * svc_rdma_copy_inline_range - Copy part of the inline content into pages
766  * @info: context for RDMA Reads
767  * @offset: offset into the Receive buffer of region to copy
768  * @remaining: length of region to copy
769  *
770  * Take a page at a time from rqstp->rq_pages and copy the inline
771  * content from the Receive buffer into that page. Update
772  * info->ri_pageno and info->ri_pageoff so that the next RDMA Read
773  * result will land contiguously with the copied content.
774  *
775  * Return values:
776  *   %0: Inline content was successfully copied
777  *   %-EINVAL: offset or length was incorrect
778  */
779 static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
780                                       unsigned int offset,
781                                       unsigned int remaining)
782 {
783         struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
784         unsigned char *dst, *src = head->rc_recv_buf;
785         struct svc_rqst *rqstp = info->ri_rqst;
786         unsigned int page_no, numpages;
787
788         numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT;
789         for (page_no = 0; page_no < numpages; page_no++) {
790                 unsigned int page_len;
791
792                 page_len = min_t(unsigned int, remaining,
793                                  PAGE_SIZE - info->ri_pageoff);
794
795                 if (!info->ri_pageoff)
796                         head->rc_page_count++;
797
798                 dst = page_address(rqstp->rq_pages[info->ri_pageno]);
799                 memcpy(dst + info->ri_pageno, src + offset, page_len);
800
801                 info->ri_totalbytes += page_len;
802                 info->ri_pageoff += page_len;
803                 if (info->ri_pageoff == PAGE_SIZE) {
804                         info->ri_pageno++;
805                         info->ri_pageoff = 0;
806                 }
807                 remaining -= page_len;
808                 offset += page_len;
809         }
810
811         return -EINVAL;
812 }
813
814 /**
815  * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
816  * @info: context for RDMA Reads
817  *
818  * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
819  * like an incoming TCP call.
820  *
821  * Return values:
822  *   %0: RDMA Read WQEs were successfully built
823  *   %-EINVAL: client provided too many chunks or segments,
824  *   %-ENOMEM: rdma_rw context pool was exhausted,
825  *   %-ENOTCONN: posting failed (connection is lost),
826  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
827  */
828 static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info)
829 {
830         struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
831         const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
832         struct xdr_buf *buf = &info->ri_rqst->rq_arg;
833         struct svc_rdma_chunk *chunk, *next;
834         unsigned int start, length;
835         int ret;
836
837         start = 0;
838         chunk = pcl_first_chunk(pcl);
839         length = chunk->ch_position;
840         ret = svc_rdma_copy_inline_range(info, start, length);
841         if (ret < 0)
842                 return ret;
843
844         pcl_for_each_chunk(chunk, pcl) {
845                 ret = svc_rdma_build_read_chunk(info, chunk);
846                 if (ret < 0)
847                         return ret;
848
849                 next = pcl_next_chunk(pcl, chunk);
850                 if (!next)
851                         break;
852
853                 start += length;
854                 length = next->ch_position - info->ri_totalbytes;
855                 ret = svc_rdma_copy_inline_range(info, start, length);
856                 if (ret < 0)
857                         return ret;
858         }
859
860         start += length;
861         length = head->rc_byte_len - start;
862         ret = svc_rdma_copy_inline_range(info, start, length);
863         if (ret < 0)
864                 return ret;
865
866         buf->len += info->ri_totalbytes;
867         buf->buflen += info->ri_totalbytes;
868
869         buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]);
870         buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
871         buf->pages = &info->ri_rqst->rq_pages[1];
872         buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
873         return 0;
874 }
875
876 /**
877  * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
878  * @info: context for RDMA Reads
879  *
880  * The chunk data lands in the page list of rqstp->rq_arg.pages.
881  *
882  * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
883  * Therefore, XDR round-up of the Read chunk and trailing
884  * inline content must both be added at the end of the pagelist.
885  *
886  * Return values:
887  *   %0: RDMA Read WQEs were successfully built
888  *   %-EINVAL: client provided too many chunks or segments,
889  *   %-ENOMEM: rdma_rw context pool was exhausted,
890  *   %-ENOTCONN: posting failed (connection is lost),
891  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
892  */
893 static int svc_rdma_read_data_item(struct svc_rdma_read_info *info)
894 {
895         struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
896         struct xdr_buf *buf = &info->ri_rqst->rq_arg;
897         struct svc_rdma_chunk *chunk;
898         unsigned int length;
899         int ret;
900
901         chunk = pcl_first_chunk(&head->rc_read_pcl);
902         ret = svc_rdma_build_read_chunk(info, chunk);
903         if (ret < 0)
904                 goto out;
905
906         /* Split the Receive buffer between the head and tail
907          * buffers at Read chunk's position. XDR roundup of the
908          * chunk is not included in either the pagelist or in
909          * the tail.
910          */
911         buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
912         buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
913         buf->head[0].iov_len = chunk->ch_position;
914
915         /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
916          *
917          * If the client already rounded up the chunk length, the
918          * length does not change. Otherwise, the length of the page
919          * list is increased to include XDR round-up.
920          *
921          * Currently these chunks always start at page offset 0,
922          * thus the rounded-up length never crosses a page boundary.
923          */
924         buf->pages = &info->ri_rqst->rq_pages[0];
925         length = xdr_align_size(chunk->ch_length);
926         buf->page_len = length;
927         buf->len += length;
928         buf->buflen += length;
929
930 out:
931         return ret;
932 }
933
934 /**
935  * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk
936  * @info: context for RDMA Reads
937  * @chunk: parsed Call chunk to pull
938  * @offset: offset of region to pull
939  * @length: length of region to pull
940  *
941  * Return values:
942  *   %0: RDMA Read WQEs were successfully built
943  *   %-EINVAL: there were not enough resources to finish
944  *   %-ENOMEM: rdma_rw context pool was exhausted,
945  *   %-ENOTCONN: posting failed (connection is lost),
946  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
947  */
948 static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
949                                      const struct svc_rdma_chunk *chunk,
950                                      unsigned int offset, unsigned int length)
951 {
952         const struct svc_rdma_segment *segment;
953         int ret;
954
955         ret = -EINVAL;
956         pcl_for_each_segment(segment, chunk) {
957                 struct svc_rdma_segment dummy;
958
959                 if (offset > segment->rs_length) {
960                         offset -= segment->rs_length;
961                         continue;
962                 }
963
964                 dummy.rs_handle = segment->rs_handle;
965                 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
966                 dummy.rs_offset = segment->rs_offset + offset;
967
968                 ret = svc_rdma_build_read_segment(info, &dummy);
969                 if (ret < 0)
970                         break;
971
972                 info->ri_totalbytes += dummy.rs_length;
973                 length -= dummy.rs_length;
974                 offset = 0;
975         }
976         return ret;
977 }
978
979 /**
980  * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
981  * @info: context for RDMA Reads
982  *
983  * Return values:
984  *   %0: RDMA Read WQEs were successfully built
985  *   %-EINVAL: there were not enough resources to finish
986  *   %-ENOMEM: rdma_rw context pool was exhausted,
987  *   %-ENOTCONN: posting failed (connection is lost),
988  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
989  */
990 static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
991 {
992         struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
993         const struct svc_rdma_chunk *call_chunk =
994                         pcl_first_chunk(&head->rc_call_pcl);
995         const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
996         struct svc_rdma_chunk *chunk, *next;
997         unsigned int start, length;
998         int ret;
999
1000         if (pcl_is_empty(pcl))
1001                 return svc_rdma_build_read_chunk(info, call_chunk);
1002
1003         start = 0;
1004         chunk = pcl_first_chunk(pcl);
1005         length = chunk->ch_position;
1006         ret = svc_rdma_read_chunk_range(info, call_chunk, start, length);
1007         if (ret < 0)
1008                 return ret;
1009
1010         pcl_for_each_chunk(chunk, pcl) {
1011                 ret = svc_rdma_build_read_chunk(info, chunk);
1012                 if (ret < 0)
1013                         return ret;
1014
1015                 next = pcl_next_chunk(pcl, chunk);
1016                 if (!next)
1017                         break;
1018
1019                 start += length;
1020                 length = next->ch_position - info->ri_totalbytes;
1021                 ret = svc_rdma_read_chunk_range(info, call_chunk,
1022                                                 start, length);
1023                 if (ret < 0)
1024                         return ret;
1025         }
1026
1027         start += length;
1028         length = call_chunk->ch_length - start;
1029         return svc_rdma_read_chunk_range(info, call_chunk, start, length);
1030 }
1031
1032 /**
1033  * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1034  * @info: context for RDMA Reads
1035  *
1036  * The start of the data lands in the first page just after the
1037  * Transport header, and the rest lands in rqstp->rq_arg.pages.
1038  *
1039  * Assumptions:
1040  *      - A PZRC is never sent in an RDMA_MSG message, though it's
1041  *        allowed by spec.
1042  *
1043  * Return values:
1044  *   %0: RDMA Read WQEs were successfully built
1045  *   %-EINVAL: client provided too many chunks or segments,
1046  *   %-ENOMEM: rdma_rw context pool was exhausted,
1047  *   %-ENOTCONN: posting failed (connection is lost),
1048  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1049  */
1050 static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info)
1051 {
1052         struct xdr_buf *buf = &info->ri_rqst->rq_arg;
1053         int ret;
1054
1055         ret = svc_rdma_read_call_chunk(info);
1056         if (ret < 0)
1057                 goto out;
1058
1059         buf->len += info->ri_totalbytes;
1060         buf->buflen += info->ri_totalbytes;
1061
1062         buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]);
1063         buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
1064         buf->pages = &info->ri_rqst->rq_pages[1];
1065         buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
1066
1067 out:
1068         return ret;
1069 }
1070
1071 /**
1072  * svc_rdma_process_read_list - Pull list of Read chunks from the client
1073  * @rdma: controlling RDMA transport
1074  * @rqstp: set of pages to use as Read sink buffers
1075  * @head: pages under I/O collect here
1076  *
1077  * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1078  * pull each Read chunk as they decode an incoming RPC message.
1079  *
1080  * On Linux, however, the server needs to have a fully-constructed RPC
1081  * message in rqstp->rq_arg when there is a positive return code from
1082  * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1083  * it is received, then here the whole Read list is pulled all at once.
1084  * The ingress RPC message is fully reconstructed once all associated
1085  * RDMA Reads have completed.
1086  *
1087  * Return values:
1088  *   %1: all needed RDMA Reads were posted successfully,
1089  *   %-EINVAL: client provided too many chunks or segments,
1090  *   %-ENOMEM: rdma_rw context pool was exhausted,
1091  *   %-ENOTCONN: posting failed (connection is lost),
1092  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1093  */
1094 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1095                                struct svc_rqst *rqstp,
1096                                struct svc_rdma_recv_ctxt *head)
1097 {
1098         struct svc_rdma_read_info *info;
1099         struct svc_rdma_chunk_ctxt *cc;
1100         int ret;
1101
1102         info = svc_rdma_read_info_alloc(rdma);
1103         if (!info)
1104                 return -ENOMEM;
1105         cc = &info->ri_cc;
1106         info->ri_rqst = rqstp;
1107         info->ri_readctxt = head;
1108         info->ri_pageno = 0;
1109         info->ri_pageoff = 0;
1110         info->ri_totalbytes = 0;
1111
1112         if (pcl_is_empty(&head->rc_call_pcl)) {
1113                 if (head->rc_read_pcl.cl_count == 1)
1114                         ret = svc_rdma_read_data_item(info);
1115                 else
1116                         ret = svc_rdma_read_multiple_chunks(info);
1117         } else
1118                 ret = svc_rdma_read_special(info);
1119         if (ret < 0)
1120                 goto out_err;
1121
1122         trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
1123         init_completion(&cc->cc_done);
1124         ret = svc_rdma_post_chunk_ctxt(cc);
1125         if (ret < 0)
1126                 goto out_err;
1127
1128         ret = 1;
1129         wait_for_completion(&cc->cc_done);
1130         if (cc->cc_status != IB_WC_SUCCESS)
1131                 ret = -EIO;
1132
1133         /* rq_respages starts after the last arg page */
1134         rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count];
1135         rqstp->rq_next_page = rqstp->rq_respages + 1;
1136
1137         /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */
1138         head->rc_page_count = 0;
1139
1140 out_err:
1141         svc_rdma_read_info_free(info);
1142         return ret;
1143 }