Merge branches 'clk-range', 'clk-uniphier', 'clk-apple' and 'clk-qcom' into clk-next
[linux-2.6-microblaze.git] / net / sunrpc / xprtrdma / verbs.c
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41
42 /*
43  * verbs.c
44  *
45  * Encapsulates the major functions managing:
46  *  o adapters
47  *  o endpoints
48  *  o connections
49  *  o buffer memory
50  */
51
52 #include <linux/interrupt.h>
53 #include <linux/slab.h>
54 #include <linux/sunrpc/addr.h>
55 #include <linux/sunrpc/svc_rdma.h>
56 #include <linux/log2.h>
57
58 #include <asm-generic/barrier.h>
59 #include <asm/bitops.h>
60
61 #include <rdma/ib_cm.h>
62
63 #include "xprt_rdma.h"
64 #include <trace/events/rpcrdma.h>
65
66 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
67 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
68 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
69                                        struct rpcrdma_sendctx *sc);
70 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
71 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
72 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep);
73 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
74 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
75 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
76 static void rpcrdma_ep_get(struct rpcrdma_ep *ep);
77 static int rpcrdma_ep_put(struct rpcrdma_ep *ep);
78 static struct rpcrdma_regbuf *
79 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
80                      gfp_t flags);
81 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
82 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
83
84 /* Wait for outstanding transport work to finish. ib_drain_qp
85  * handles the drains in the wrong order for us, so open code
86  * them here.
87  */
88 static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
89 {
90         struct rpcrdma_ep *ep = r_xprt->rx_ep;
91         struct rdma_cm_id *id = ep->re_id;
92
93         /* Wait for rpcrdma_post_recvs() to leave its critical
94          * section.
95          */
96         if (atomic_inc_return(&ep->re_receiving) > 1)
97                 wait_for_completion(&ep->re_done);
98
99         /* Flush Receives, then wait for deferred Reply work
100          * to complete.
101          */
102         ib_drain_rq(id->qp);
103
104         /* Deferred Reply processing might have scheduled
105          * local invalidations.
106          */
107         ib_drain_sq(id->qp);
108
109         rpcrdma_ep_put(ep);
110 }
111
112 /* Ensure xprt_force_disconnect() is invoked exactly once when a
113  * connection is closed or lost. (The important thing is it needs
114  * to be invoked "at least" once).
115  */
116 void rpcrdma_force_disconnect(struct rpcrdma_ep *ep)
117 {
118         if (atomic_add_unless(&ep->re_force_disconnect, 1, 1))
119                 xprt_force_disconnect(ep->re_xprt);
120 }
121
122 /**
123  * rpcrdma_flush_disconnect - Disconnect on flushed completion
124  * @r_xprt: transport to disconnect
125  * @wc: work completion entry
126  *
127  * Must be called in process context.
128  */
129 void rpcrdma_flush_disconnect(struct rpcrdma_xprt *r_xprt, struct ib_wc *wc)
130 {
131         if (wc->status != IB_WC_SUCCESS)
132                 rpcrdma_force_disconnect(r_xprt->rx_ep);
133 }
134
135 /**
136  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
137  * @cq: completion queue
138  * @wc: WCE for a completed Send WR
139  *
140  */
141 static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
142 {
143         struct ib_cqe *cqe = wc->wr_cqe;
144         struct rpcrdma_sendctx *sc =
145                 container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
146         struct rpcrdma_xprt *r_xprt = cq->cq_context;
147
148         /* WARNING: Only wr_cqe and status are reliable at this point */
149         trace_xprtrdma_wc_send(wc, &sc->sc_cid);
150         rpcrdma_sendctx_put_locked(r_xprt, sc);
151         rpcrdma_flush_disconnect(r_xprt, wc);
152 }
153
154 /**
155  * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
156  * @cq: completion queue
157  * @wc: WCE for a completed Receive WR
158  *
159  */
160 static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
161 {
162         struct ib_cqe *cqe = wc->wr_cqe;
163         struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
164                                                rr_cqe);
165         struct rpcrdma_xprt *r_xprt = cq->cq_context;
166
167         /* WARNING: Only wr_cqe and status are reliable at this point */
168         trace_xprtrdma_wc_receive(wc, &rep->rr_cid);
169         --r_xprt->rx_ep->re_receive_count;
170         if (wc->status != IB_WC_SUCCESS)
171                 goto out_flushed;
172
173         /* status == SUCCESS means all fields in wc are trustworthy */
174         rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
175         rep->rr_wc_flags = wc->wc_flags;
176         rep->rr_inv_rkey = wc->ex.invalidate_rkey;
177
178         ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
179                                    rdmab_addr(rep->rr_rdmabuf),
180                                    wc->byte_len, DMA_FROM_DEVICE);
181
182         rpcrdma_reply_handler(rep);
183         return;
184
185 out_flushed:
186         rpcrdma_flush_disconnect(r_xprt, wc);
187         rpcrdma_rep_put(&r_xprt->rx_buf, rep);
188 }
189
190 static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
191                                       struct rdma_conn_param *param)
192 {
193         const struct rpcrdma_connect_private *pmsg = param->private_data;
194         unsigned int rsize, wsize;
195
196         /* Default settings for RPC-over-RDMA Version One */
197         rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
198         wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
199
200         if (pmsg &&
201             pmsg->cp_magic == rpcrdma_cmp_magic &&
202             pmsg->cp_version == RPCRDMA_CMP_VERSION) {
203                 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
204                 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
205         }
206
207         if (rsize < ep->re_inline_recv)
208                 ep->re_inline_recv = rsize;
209         if (wsize < ep->re_inline_send)
210                 ep->re_inline_send = wsize;
211
212         rpcrdma_set_max_header_sizes(ep);
213 }
214
215 /**
216  * rpcrdma_cm_event_handler - Handle RDMA CM events
217  * @id: rdma_cm_id on which an event has occurred
218  * @event: details of the event
219  *
220  * Called with @id's mutex held. Returns 1 if caller should
221  * destroy @id, otherwise 0.
222  */
223 static int
224 rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
225 {
226         struct sockaddr *sap = (struct sockaddr *)&id->route.addr.dst_addr;
227         struct rpcrdma_ep *ep = id->context;
228
229         might_sleep();
230
231         switch (event->event) {
232         case RDMA_CM_EVENT_ADDR_RESOLVED:
233         case RDMA_CM_EVENT_ROUTE_RESOLVED:
234                 ep->re_async_rc = 0;
235                 complete(&ep->re_done);
236                 return 0;
237         case RDMA_CM_EVENT_ADDR_ERROR:
238                 ep->re_async_rc = -EPROTO;
239                 complete(&ep->re_done);
240                 return 0;
241         case RDMA_CM_EVENT_ROUTE_ERROR:
242                 ep->re_async_rc = -ENETUNREACH;
243                 complete(&ep->re_done);
244                 return 0;
245         case RDMA_CM_EVENT_DEVICE_REMOVAL:
246                 pr_info("rpcrdma: removing device %s for %pISpc\n",
247                         ep->re_id->device->name, sap);
248                 fallthrough;
249         case RDMA_CM_EVENT_ADDR_CHANGE:
250                 ep->re_connect_status = -ENODEV;
251                 goto disconnected;
252         case RDMA_CM_EVENT_ESTABLISHED:
253                 rpcrdma_ep_get(ep);
254                 ep->re_connect_status = 1;
255                 rpcrdma_update_cm_private(ep, &event->param.conn);
256                 trace_xprtrdma_inline_thresh(ep);
257                 wake_up_all(&ep->re_connect_wait);
258                 break;
259         case RDMA_CM_EVENT_CONNECT_ERROR:
260                 ep->re_connect_status = -ENOTCONN;
261                 goto wake_connect_worker;
262         case RDMA_CM_EVENT_UNREACHABLE:
263                 ep->re_connect_status = -ENETUNREACH;
264                 goto wake_connect_worker;
265         case RDMA_CM_EVENT_REJECTED:
266                 ep->re_connect_status = -ECONNREFUSED;
267                 if (event->status == IB_CM_REJ_STALE_CONN)
268                         ep->re_connect_status = -ENOTCONN;
269 wake_connect_worker:
270                 wake_up_all(&ep->re_connect_wait);
271                 return 0;
272         case RDMA_CM_EVENT_DISCONNECTED:
273                 ep->re_connect_status = -ECONNABORTED;
274 disconnected:
275                 rpcrdma_force_disconnect(ep);
276                 return rpcrdma_ep_put(ep);
277         default:
278                 break;
279         }
280
281         return 0;
282 }
283
284 static struct rdma_cm_id *rpcrdma_create_id(struct rpcrdma_xprt *r_xprt,
285                                             struct rpcrdma_ep *ep)
286 {
287         unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
288         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
289         struct rdma_cm_id *id;
290         int rc;
291
292         init_completion(&ep->re_done);
293
294         id = rdma_create_id(xprt->xprt_net, rpcrdma_cm_event_handler, ep,
295                             RDMA_PS_TCP, IB_QPT_RC);
296         if (IS_ERR(id))
297                 return id;
298
299         ep->re_async_rc = -ETIMEDOUT;
300         rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)&xprt->addr,
301                                RDMA_RESOLVE_TIMEOUT);
302         if (rc)
303                 goto out;
304         rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
305         if (rc < 0)
306                 goto out;
307
308         rc = ep->re_async_rc;
309         if (rc)
310                 goto out;
311
312         ep->re_async_rc = -ETIMEDOUT;
313         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
314         if (rc)
315                 goto out;
316         rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
317         if (rc < 0)
318                 goto out;
319         rc = ep->re_async_rc;
320         if (rc)
321                 goto out;
322
323         return id;
324
325 out:
326         rdma_destroy_id(id);
327         return ERR_PTR(rc);
328 }
329
330 static void rpcrdma_ep_destroy(struct kref *kref)
331 {
332         struct rpcrdma_ep *ep = container_of(kref, struct rpcrdma_ep, re_kref);
333
334         if (ep->re_id->qp) {
335                 rdma_destroy_qp(ep->re_id);
336                 ep->re_id->qp = NULL;
337         }
338
339         if (ep->re_attr.recv_cq)
340                 ib_free_cq(ep->re_attr.recv_cq);
341         ep->re_attr.recv_cq = NULL;
342         if (ep->re_attr.send_cq)
343                 ib_free_cq(ep->re_attr.send_cq);
344         ep->re_attr.send_cq = NULL;
345
346         if (ep->re_pd)
347                 ib_dealloc_pd(ep->re_pd);
348         ep->re_pd = NULL;
349
350         kfree(ep);
351         module_put(THIS_MODULE);
352 }
353
354 static noinline void rpcrdma_ep_get(struct rpcrdma_ep *ep)
355 {
356         kref_get(&ep->re_kref);
357 }
358
359 /* Returns:
360  *     %0 if @ep still has a positive kref count, or
361  *     %1 if @ep was destroyed successfully.
362  */
363 static noinline int rpcrdma_ep_put(struct rpcrdma_ep *ep)
364 {
365         return kref_put(&ep->re_kref, rpcrdma_ep_destroy);
366 }
367
368 static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
369 {
370         struct rpcrdma_connect_private *pmsg;
371         struct ib_device *device;
372         struct rdma_cm_id *id;
373         struct rpcrdma_ep *ep;
374         int rc;
375
376         ep = kzalloc(sizeof(*ep), GFP_NOFS);
377         if (!ep)
378                 return -ENOTCONN;
379         ep->re_xprt = &r_xprt->rx_xprt;
380         kref_init(&ep->re_kref);
381
382         id = rpcrdma_create_id(r_xprt, ep);
383         if (IS_ERR(id)) {
384                 kfree(ep);
385                 return PTR_ERR(id);
386         }
387         __module_get(THIS_MODULE);
388         device = id->device;
389         ep->re_id = id;
390         reinit_completion(&ep->re_done);
391
392         ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
393         ep->re_inline_send = xprt_rdma_max_inline_write;
394         ep->re_inline_recv = xprt_rdma_max_inline_read;
395         rc = frwr_query_device(ep, device);
396         if (rc)
397                 goto out_destroy;
398
399         r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
400
401         ep->re_attr.srq = NULL;
402         ep->re_attr.cap.max_inline_data = 0;
403         ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
404         ep->re_attr.qp_type = IB_QPT_RC;
405         ep->re_attr.port_num = ~0;
406
407         ep->re_send_batch = ep->re_max_requests >> 3;
408         ep->re_send_count = ep->re_send_batch;
409         init_waitqueue_head(&ep->re_connect_wait);
410
411         ep->re_attr.send_cq = ib_alloc_cq_any(device, r_xprt,
412                                               ep->re_attr.cap.max_send_wr,
413                                               IB_POLL_WORKQUEUE);
414         if (IS_ERR(ep->re_attr.send_cq)) {
415                 rc = PTR_ERR(ep->re_attr.send_cq);
416                 ep->re_attr.send_cq = NULL;
417                 goto out_destroy;
418         }
419
420         ep->re_attr.recv_cq = ib_alloc_cq_any(device, r_xprt,
421                                               ep->re_attr.cap.max_recv_wr,
422                                               IB_POLL_WORKQUEUE);
423         if (IS_ERR(ep->re_attr.recv_cq)) {
424                 rc = PTR_ERR(ep->re_attr.recv_cq);
425                 ep->re_attr.recv_cq = NULL;
426                 goto out_destroy;
427         }
428         ep->re_receive_count = 0;
429
430         /* Initialize cma parameters */
431         memset(&ep->re_remote_cma, 0, sizeof(ep->re_remote_cma));
432
433         /* Prepare RDMA-CM private message */
434         pmsg = &ep->re_cm_private;
435         pmsg->cp_magic = rpcrdma_cmp_magic;
436         pmsg->cp_version = RPCRDMA_CMP_VERSION;
437         pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
438         pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->re_inline_send);
439         pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->re_inline_recv);
440         ep->re_remote_cma.private_data = pmsg;
441         ep->re_remote_cma.private_data_len = sizeof(*pmsg);
442
443         /* Client offers RDMA Read but does not initiate */
444         ep->re_remote_cma.initiator_depth = 0;
445         ep->re_remote_cma.responder_resources =
446                 min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
447
448         /* Limit transport retries so client can detect server
449          * GID changes quickly. RPC layer handles re-establishing
450          * transport connection and retransmission.
451          */
452         ep->re_remote_cma.retry_count = 6;
453
454         /* RPC-over-RDMA handles its own flow control. In addition,
455          * make all RNR NAKs visible so we know that RPC-over-RDMA
456          * flow control is working correctly (no NAKs should be seen).
457          */
458         ep->re_remote_cma.flow_control = 0;
459         ep->re_remote_cma.rnr_retry_count = 0;
460
461         ep->re_pd = ib_alloc_pd(device, 0);
462         if (IS_ERR(ep->re_pd)) {
463                 rc = PTR_ERR(ep->re_pd);
464                 ep->re_pd = NULL;
465                 goto out_destroy;
466         }
467
468         rc = rdma_create_qp(id, ep->re_pd, &ep->re_attr);
469         if (rc)
470                 goto out_destroy;
471
472         r_xprt->rx_ep = ep;
473         return 0;
474
475 out_destroy:
476         rpcrdma_ep_put(ep);
477         rdma_destroy_id(id);
478         return rc;
479 }
480
481 /**
482  * rpcrdma_xprt_connect - Connect an unconnected transport
483  * @r_xprt: controlling transport instance
484  *
485  * Returns 0 on success or a negative errno.
486  */
487 int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
488 {
489         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
490         struct rpcrdma_ep *ep;
491         int rc;
492
493         rc = rpcrdma_ep_create(r_xprt);
494         if (rc)
495                 return rc;
496         ep = r_xprt->rx_ep;
497
498         xprt_clear_connected(xprt);
499         rpcrdma_reset_cwnd(r_xprt);
500
501         /* Bump the ep's reference count while there are
502          * outstanding Receives.
503          */
504         rpcrdma_ep_get(ep);
505         rpcrdma_post_recvs(r_xprt, 1, true);
506
507         rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
508         if (rc)
509                 goto out;
510
511         if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
512                 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
513         wait_event_interruptible(ep->re_connect_wait,
514                                  ep->re_connect_status != 0);
515         if (ep->re_connect_status <= 0) {
516                 rc = ep->re_connect_status;
517                 goto out;
518         }
519
520         rc = rpcrdma_sendctxs_create(r_xprt);
521         if (rc) {
522                 rc = -ENOTCONN;
523                 goto out;
524         }
525
526         rc = rpcrdma_reqs_setup(r_xprt);
527         if (rc) {
528                 rc = -ENOTCONN;
529                 goto out;
530         }
531         rpcrdma_mrs_create(r_xprt);
532         frwr_wp_create(r_xprt);
533
534 out:
535         trace_xprtrdma_connect(r_xprt, rc);
536         return rc;
537 }
538
539 /**
540  * rpcrdma_xprt_disconnect - Disconnect underlying transport
541  * @r_xprt: controlling transport instance
542  *
543  * Caller serializes. Either the transport send lock is held,
544  * or we're being called to destroy the transport.
545  *
546  * On return, @r_xprt is completely divested of all hardware
547  * resources and prepared for the next ->connect operation.
548  */
549 void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
550 {
551         struct rpcrdma_ep *ep = r_xprt->rx_ep;
552         struct rdma_cm_id *id;
553         int rc;
554
555         if (!ep)
556                 return;
557
558         id = ep->re_id;
559         rc = rdma_disconnect(id);
560         trace_xprtrdma_disconnect(r_xprt, rc);
561
562         rpcrdma_xprt_drain(r_xprt);
563         rpcrdma_reps_unmap(r_xprt);
564         rpcrdma_reqs_reset(r_xprt);
565         rpcrdma_mrs_destroy(r_xprt);
566         rpcrdma_sendctxs_destroy(r_xprt);
567
568         if (rpcrdma_ep_put(ep))
569                 rdma_destroy_id(id);
570
571         r_xprt->rx_ep = NULL;
572 }
573
574 /* Fixed-size circular FIFO queue. This implementation is wait-free and
575  * lock-free.
576  *
577  * Consumer is the code path that posts Sends. This path dequeues a
578  * sendctx for use by a Send operation. Multiple consumer threads
579  * are serialized by the RPC transport lock, which allows only one
580  * ->send_request call at a time.
581  *
582  * Producer is the code path that handles Send completions. This path
583  * enqueues a sendctx that has been completed. Multiple producer
584  * threads are serialized by the ib_poll_cq() function.
585  */
586
587 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
588  * queue activity, and rpcrdma_xprt_drain has flushed all remaining
589  * Send requests.
590  */
591 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
592 {
593         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
594         unsigned long i;
595
596         if (!buf->rb_sc_ctxs)
597                 return;
598         for (i = 0; i <= buf->rb_sc_last; i++)
599                 kfree(buf->rb_sc_ctxs[i]);
600         kfree(buf->rb_sc_ctxs);
601         buf->rb_sc_ctxs = NULL;
602 }
603
604 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
605 {
606         struct rpcrdma_sendctx *sc;
607
608         sc = kzalloc(struct_size(sc, sc_sges, ep->re_attr.cap.max_send_sge),
609                      GFP_KERNEL);
610         if (!sc)
611                 return NULL;
612
613         sc->sc_cqe.done = rpcrdma_wc_send;
614         sc->sc_cid.ci_queue_id = ep->re_attr.send_cq->res.id;
615         sc->sc_cid.ci_completion_id =
616                 atomic_inc_return(&ep->re_completion_ids);
617         return sc;
618 }
619
620 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
621 {
622         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
623         struct rpcrdma_sendctx *sc;
624         unsigned long i;
625
626         /* Maximum number of concurrent outstanding Send WRs. Capping
627          * the circular queue size stops Send Queue overflow by causing
628          * the ->send_request call to fail temporarily before too many
629          * Sends are posted.
630          */
631         i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
632         buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
633         if (!buf->rb_sc_ctxs)
634                 return -ENOMEM;
635
636         buf->rb_sc_last = i - 1;
637         for (i = 0; i <= buf->rb_sc_last; i++) {
638                 sc = rpcrdma_sendctx_create(r_xprt->rx_ep);
639                 if (!sc)
640                         return -ENOMEM;
641
642                 buf->rb_sc_ctxs[i] = sc;
643         }
644
645         buf->rb_sc_head = 0;
646         buf->rb_sc_tail = 0;
647         return 0;
648 }
649
650 /* The sendctx queue is not guaranteed to have a size that is a
651  * power of two, thus the helpers in circ_buf.h cannot be used.
652  * The other option is to use modulus (%), which can be expensive.
653  */
654 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
655                                           unsigned long item)
656 {
657         return likely(item < buf->rb_sc_last) ? item + 1 : 0;
658 }
659
660 /**
661  * rpcrdma_sendctx_get_locked - Acquire a send context
662  * @r_xprt: controlling transport instance
663  *
664  * Returns pointer to a free send completion context; or NULL if
665  * the queue is empty.
666  *
667  * Usage: Called to acquire an SGE array before preparing a Send WR.
668  *
669  * The caller serializes calls to this function (per transport), and
670  * provides an effective memory barrier that flushes the new value
671  * of rb_sc_head.
672  */
673 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
674 {
675         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
676         struct rpcrdma_sendctx *sc;
677         unsigned long next_head;
678
679         next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
680
681         if (next_head == READ_ONCE(buf->rb_sc_tail))
682                 goto out_emptyq;
683
684         /* ORDER: item must be accessed _before_ head is updated */
685         sc = buf->rb_sc_ctxs[next_head];
686
687         /* Releasing the lock in the caller acts as a memory
688          * barrier that flushes rb_sc_head.
689          */
690         buf->rb_sc_head = next_head;
691
692         return sc;
693
694 out_emptyq:
695         /* The queue is "empty" if there have not been enough Send
696          * completions recently. This is a sign the Send Queue is
697          * backing up. Cause the caller to pause and try again.
698          */
699         xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
700         r_xprt->rx_stats.empty_sendctx_q++;
701         return NULL;
702 }
703
704 /**
705  * rpcrdma_sendctx_put_locked - Release a send context
706  * @r_xprt: controlling transport instance
707  * @sc: send context to release
708  *
709  * Usage: Called from Send completion to return a sendctxt
710  * to the queue.
711  *
712  * The caller serializes calls to this function (per transport).
713  */
714 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
715                                        struct rpcrdma_sendctx *sc)
716 {
717         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
718         unsigned long next_tail;
719
720         /* Unmap SGEs of previously completed but unsignaled
721          * Sends by walking up the queue until @sc is found.
722          */
723         next_tail = buf->rb_sc_tail;
724         do {
725                 next_tail = rpcrdma_sendctx_next(buf, next_tail);
726
727                 /* ORDER: item must be accessed _before_ tail is updated */
728                 rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
729
730         } while (buf->rb_sc_ctxs[next_tail] != sc);
731
732         /* Paired with READ_ONCE */
733         smp_store_release(&buf->rb_sc_tail, next_tail);
734
735         xprt_write_space(&r_xprt->rx_xprt);
736 }
737
738 static void
739 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
740 {
741         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
742         struct rpcrdma_ep *ep = r_xprt->rx_ep;
743         unsigned int count;
744
745         for (count = 0; count < ep->re_max_rdma_segs; count++) {
746                 struct rpcrdma_mr *mr;
747                 int rc;
748
749                 mr = kzalloc(sizeof(*mr), GFP_NOFS);
750                 if (!mr)
751                         break;
752
753                 rc = frwr_mr_init(r_xprt, mr);
754                 if (rc) {
755                         kfree(mr);
756                         break;
757                 }
758
759                 spin_lock(&buf->rb_lock);
760                 rpcrdma_mr_push(mr, &buf->rb_mrs);
761                 list_add(&mr->mr_all, &buf->rb_all_mrs);
762                 spin_unlock(&buf->rb_lock);
763         }
764
765         r_xprt->rx_stats.mrs_allocated += count;
766         trace_xprtrdma_createmrs(r_xprt, count);
767 }
768
769 static void
770 rpcrdma_mr_refresh_worker(struct work_struct *work)
771 {
772         struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
773                                                   rb_refresh_worker);
774         struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
775                                                    rx_buf);
776
777         rpcrdma_mrs_create(r_xprt);
778         xprt_write_space(&r_xprt->rx_xprt);
779 }
780
781 /**
782  * rpcrdma_mrs_refresh - Wake the MR refresh worker
783  * @r_xprt: controlling transport instance
784  *
785  */
786 void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
787 {
788         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
789         struct rpcrdma_ep *ep = r_xprt->rx_ep;
790
791         /* If there is no underlying connection, it's no use
792          * to wake the refresh worker.
793          */
794         if (ep->re_connect_status == 1) {
795                 /* The work is scheduled on a WQ_MEM_RECLAIM
796                  * workqueue in order to prevent MR allocation
797                  * from recursing into NFS during direct reclaim.
798                  */
799                 queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
800         }
801 }
802
803 /**
804  * rpcrdma_req_create - Allocate an rpcrdma_req object
805  * @r_xprt: controlling r_xprt
806  * @size: initial size, in bytes, of send and receive buffers
807  * @flags: GFP flags passed to memory allocators
808  *
809  * Returns an allocated and fully initialized rpcrdma_req or NULL.
810  */
811 struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
812                                        gfp_t flags)
813 {
814         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
815         struct rpcrdma_req *req;
816
817         req = kzalloc(sizeof(*req), flags);
818         if (req == NULL)
819                 goto out1;
820
821         req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
822         if (!req->rl_sendbuf)
823                 goto out2;
824
825         req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
826         if (!req->rl_recvbuf)
827                 goto out3;
828
829         INIT_LIST_HEAD(&req->rl_free_mrs);
830         INIT_LIST_HEAD(&req->rl_registered);
831         spin_lock(&buffer->rb_lock);
832         list_add(&req->rl_all, &buffer->rb_allreqs);
833         spin_unlock(&buffer->rb_lock);
834         return req;
835
836 out3:
837         kfree(req->rl_sendbuf);
838 out2:
839         kfree(req);
840 out1:
841         return NULL;
842 }
843
844 /**
845  * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
846  * @r_xprt: controlling transport instance
847  * @req: rpcrdma_req object to set up
848  *
849  * Returns zero on success, and a negative errno on failure.
850  */
851 int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
852 {
853         struct rpcrdma_regbuf *rb;
854         size_t maxhdrsize;
855
856         /* Compute maximum header buffer size in bytes */
857         maxhdrsize = rpcrdma_fixed_maxsz + 3 +
858                      r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
859         maxhdrsize *= sizeof(__be32);
860         rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
861                                   DMA_TO_DEVICE, GFP_KERNEL);
862         if (!rb)
863                 goto out;
864
865         if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
866                 goto out_free;
867
868         req->rl_rdmabuf = rb;
869         xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
870         return 0;
871
872 out_free:
873         rpcrdma_regbuf_free(rb);
874 out:
875         return -ENOMEM;
876 }
877
878 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
879  * and thus can be walked without holding rb_lock. Eg. the
880  * caller is holding the transport send lock to exclude
881  * device removal or disconnection.
882  */
883 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
884 {
885         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
886         struct rpcrdma_req *req;
887         int rc;
888
889         list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
890                 rc = rpcrdma_req_setup(r_xprt, req);
891                 if (rc)
892                         return rc;
893         }
894         return 0;
895 }
896
897 static void rpcrdma_req_reset(struct rpcrdma_req *req)
898 {
899         /* Credits are valid for only one connection */
900         req->rl_slot.rq_cong = 0;
901
902         rpcrdma_regbuf_free(req->rl_rdmabuf);
903         req->rl_rdmabuf = NULL;
904
905         rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
906         rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
907
908         frwr_reset(req);
909 }
910
911 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
912  * and thus can be walked without holding rb_lock. Eg. the
913  * caller is holding the transport send lock to exclude
914  * device removal or disconnection.
915  */
916 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
917 {
918         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
919         struct rpcrdma_req *req;
920
921         list_for_each_entry(req, &buf->rb_allreqs, rl_all)
922                 rpcrdma_req_reset(req);
923 }
924
925 static noinline
926 struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
927                                        bool temp)
928 {
929         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
930         struct rpcrdma_rep *rep;
931
932         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
933         if (rep == NULL)
934                 goto out;
935
936         rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep->re_inline_recv,
937                                                DMA_FROM_DEVICE, GFP_KERNEL);
938         if (!rep->rr_rdmabuf)
939                 goto out_free;
940
941         if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
942                 goto out_free_regbuf;
943
944         rep->rr_cid.ci_completion_id =
945                 atomic_inc_return(&r_xprt->rx_ep->re_completion_ids);
946
947         xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
948                      rdmab_length(rep->rr_rdmabuf));
949         rep->rr_cqe.done = rpcrdma_wc_receive;
950         rep->rr_rxprt = r_xprt;
951         rep->rr_recv_wr.next = NULL;
952         rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
953         rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
954         rep->rr_recv_wr.num_sge = 1;
955         rep->rr_temp = temp;
956
957         spin_lock(&buf->rb_lock);
958         list_add(&rep->rr_all, &buf->rb_all_reps);
959         spin_unlock(&buf->rb_lock);
960         return rep;
961
962 out_free_regbuf:
963         rpcrdma_regbuf_free(rep->rr_rdmabuf);
964 out_free:
965         kfree(rep);
966 out:
967         return NULL;
968 }
969
970 static void rpcrdma_rep_free(struct rpcrdma_rep *rep)
971 {
972         rpcrdma_regbuf_free(rep->rr_rdmabuf);
973         kfree(rep);
974 }
975
976 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
977 {
978         struct rpcrdma_buffer *buf = &rep->rr_rxprt->rx_buf;
979
980         spin_lock(&buf->rb_lock);
981         list_del(&rep->rr_all);
982         spin_unlock(&buf->rb_lock);
983
984         rpcrdma_rep_free(rep);
985 }
986
987 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
988 {
989         struct llist_node *node;
990
991         /* Calls to llist_del_first are required to be serialized */
992         node = llist_del_first(&buf->rb_free_reps);
993         if (!node)
994                 return NULL;
995         return llist_entry(node, struct rpcrdma_rep, rr_node);
996 }
997
998 /**
999  * rpcrdma_rep_put - Release rpcrdma_rep back to free list
1000  * @buf: buffer pool
1001  * @rep: rep to release
1002  *
1003  */
1004 void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
1005 {
1006         llist_add(&rep->rr_node, &buf->rb_free_reps);
1007 }
1008
1009 /* Caller must ensure the QP is quiescent (RQ is drained) before
1010  * invoking this function, to guarantee rb_all_reps is not
1011  * changing.
1012  */
1013 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
1014 {
1015         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1016         struct rpcrdma_rep *rep;
1017
1018         list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
1019                 rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
1020                 rep->rr_temp = true;    /* Mark this rep for destruction */
1021         }
1022 }
1023
1024 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1025 {
1026         struct rpcrdma_rep *rep;
1027
1028         spin_lock(&buf->rb_lock);
1029         while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
1030                                                struct rpcrdma_rep,
1031                                                rr_all)) != NULL) {
1032                 list_del(&rep->rr_all);
1033                 spin_unlock(&buf->rb_lock);
1034
1035                 rpcrdma_rep_free(rep);
1036
1037                 spin_lock(&buf->rb_lock);
1038         }
1039         spin_unlock(&buf->rb_lock);
1040 }
1041
1042 /**
1043  * rpcrdma_buffer_create - Create initial set of req/rep objects
1044  * @r_xprt: transport instance to (re)initialize
1045  *
1046  * Returns zero on success, otherwise a negative errno.
1047  */
1048 int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1049 {
1050         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1051         int i, rc;
1052
1053         buf->rb_bc_srv_max_requests = 0;
1054         spin_lock_init(&buf->rb_lock);
1055         INIT_LIST_HEAD(&buf->rb_mrs);
1056         INIT_LIST_HEAD(&buf->rb_all_mrs);
1057         INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
1058
1059         INIT_LIST_HEAD(&buf->rb_send_bufs);
1060         INIT_LIST_HEAD(&buf->rb_allreqs);
1061         INIT_LIST_HEAD(&buf->rb_all_reps);
1062
1063         rc = -ENOMEM;
1064         for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
1065                 struct rpcrdma_req *req;
1066
1067                 req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
1068                                          GFP_KERNEL);
1069                 if (!req)
1070                         goto out;
1071                 list_add(&req->rl_list, &buf->rb_send_bufs);
1072         }
1073
1074         init_llist_head(&buf->rb_free_reps);
1075
1076         return 0;
1077 out:
1078         rpcrdma_buffer_destroy(buf);
1079         return rc;
1080 }
1081
1082 /**
1083  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1084  * @req: unused object to be destroyed
1085  *
1086  * Relies on caller holding the transport send lock to protect
1087  * removing req->rl_all from buf->rb_all_reqs safely.
1088  */
1089 void rpcrdma_req_destroy(struct rpcrdma_req *req)
1090 {
1091         struct rpcrdma_mr *mr;
1092
1093         list_del(&req->rl_all);
1094
1095         while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
1096                 struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
1097
1098                 spin_lock(&buf->rb_lock);
1099                 list_del(&mr->mr_all);
1100                 spin_unlock(&buf->rb_lock);
1101
1102                 frwr_mr_release(mr);
1103         }
1104
1105         rpcrdma_regbuf_free(req->rl_recvbuf);
1106         rpcrdma_regbuf_free(req->rl_sendbuf);
1107         rpcrdma_regbuf_free(req->rl_rdmabuf);
1108         kfree(req);
1109 }
1110
1111 /**
1112  * rpcrdma_mrs_destroy - Release all of a transport's MRs
1113  * @r_xprt: controlling transport instance
1114  *
1115  * Relies on caller holding the transport send lock to protect
1116  * removing mr->mr_list from req->rl_free_mrs safely.
1117  */
1118 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
1119 {
1120         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1121         struct rpcrdma_mr *mr;
1122
1123         cancel_work_sync(&buf->rb_refresh_worker);
1124
1125         spin_lock(&buf->rb_lock);
1126         while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1127                                               struct rpcrdma_mr,
1128                                               mr_all)) != NULL) {
1129                 list_del(&mr->mr_list);
1130                 list_del(&mr->mr_all);
1131                 spin_unlock(&buf->rb_lock);
1132
1133                 frwr_mr_release(mr);
1134
1135                 spin_lock(&buf->rb_lock);
1136         }
1137         spin_unlock(&buf->rb_lock);
1138 }
1139
1140 /**
1141  * rpcrdma_buffer_destroy - Release all hw resources
1142  * @buf: root control block for resources
1143  *
1144  * ORDERING: relies on a prior rpcrdma_xprt_drain :
1145  * - No more Send or Receive completions can occur
1146  * - All MRs, reps, and reqs are returned to their free lists
1147  */
1148 void
1149 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1150 {
1151         rpcrdma_reps_destroy(buf);
1152
1153         while (!list_empty(&buf->rb_send_bufs)) {
1154                 struct rpcrdma_req *req;
1155
1156                 req = list_first_entry(&buf->rb_send_bufs,
1157                                        struct rpcrdma_req, rl_list);
1158                 list_del(&req->rl_list);
1159                 rpcrdma_req_destroy(req);
1160         }
1161 }
1162
1163 /**
1164  * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1165  * @r_xprt: controlling transport
1166  *
1167  * Returns an initialized rpcrdma_mr or NULL if no free
1168  * rpcrdma_mr objects are available.
1169  */
1170 struct rpcrdma_mr *
1171 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1172 {
1173         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1174         struct rpcrdma_mr *mr;
1175
1176         spin_lock(&buf->rb_lock);
1177         mr = rpcrdma_mr_pop(&buf->rb_mrs);
1178         spin_unlock(&buf->rb_lock);
1179         return mr;
1180 }
1181
1182 /**
1183  * rpcrdma_reply_put - Put reply buffers back into pool
1184  * @buffers: buffer pool
1185  * @req: object to return
1186  *
1187  */
1188 void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1189 {
1190         if (req->rl_reply) {
1191                 rpcrdma_rep_put(buffers, req->rl_reply);
1192                 req->rl_reply = NULL;
1193         }
1194 }
1195
1196 /**
1197  * rpcrdma_buffer_get - Get a request buffer
1198  * @buffers: Buffer pool from which to obtain a buffer
1199  *
1200  * Returns a fresh rpcrdma_req, or NULL if none are available.
1201  */
1202 struct rpcrdma_req *
1203 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1204 {
1205         struct rpcrdma_req *req;
1206
1207         spin_lock(&buffers->rb_lock);
1208         req = list_first_entry_or_null(&buffers->rb_send_bufs,
1209                                        struct rpcrdma_req, rl_list);
1210         if (req)
1211                 list_del_init(&req->rl_list);
1212         spin_unlock(&buffers->rb_lock);
1213         return req;
1214 }
1215
1216 /**
1217  * rpcrdma_buffer_put - Put request/reply buffers back into pool
1218  * @buffers: buffer pool
1219  * @req: object to return
1220  *
1221  */
1222 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1223 {
1224         rpcrdma_reply_put(buffers, req);
1225
1226         spin_lock(&buffers->rb_lock);
1227         list_add(&req->rl_list, &buffers->rb_send_bufs);
1228         spin_unlock(&buffers->rb_lock);
1229 }
1230
1231 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1232  *
1233  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1234  * receiving the payload of RDMA RECV operations. During Long Calls
1235  * or Replies they may be registered externally via frwr_map.
1236  */
1237 static struct rpcrdma_regbuf *
1238 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
1239                      gfp_t flags)
1240 {
1241         struct rpcrdma_regbuf *rb;
1242
1243         rb = kmalloc(sizeof(*rb), flags);
1244         if (!rb)
1245                 return NULL;
1246         rb->rg_data = kmalloc(size, flags);
1247         if (!rb->rg_data) {
1248                 kfree(rb);
1249                 return NULL;
1250         }
1251
1252         rb->rg_device = NULL;
1253         rb->rg_direction = direction;
1254         rb->rg_iov.length = size;
1255         return rb;
1256 }
1257
1258 /**
1259  * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1260  * @rb: regbuf to reallocate
1261  * @size: size of buffer to be allocated, in bytes
1262  * @flags: GFP flags
1263  *
1264  * Returns true if reallocation was successful. If false is
1265  * returned, @rb is left untouched.
1266  */
1267 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
1268 {
1269         void *buf;
1270
1271         buf = kmalloc(size, flags);
1272         if (!buf)
1273                 return false;
1274
1275         rpcrdma_regbuf_dma_unmap(rb);
1276         kfree(rb->rg_data);
1277
1278         rb->rg_data = buf;
1279         rb->rg_iov.length = size;
1280         return true;
1281 }
1282
1283 /**
1284  * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1285  * @r_xprt: controlling transport instance
1286  * @rb: regbuf to be mapped
1287  *
1288  * Returns true if the buffer is now DMA mapped to @r_xprt's device
1289  */
1290 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1291                               struct rpcrdma_regbuf *rb)
1292 {
1293         struct ib_device *device = r_xprt->rx_ep->re_id->device;
1294
1295         if (rb->rg_direction == DMA_NONE)
1296                 return false;
1297
1298         rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1299                                             rdmab_length(rb), rb->rg_direction);
1300         if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1301                 trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1302                 return false;
1303         }
1304
1305         rb->rg_device = device;
1306         rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
1307         return true;
1308 }
1309
1310 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
1311 {
1312         if (!rb)
1313                 return;
1314
1315         if (!rpcrdma_regbuf_is_mapped(rb))
1316                 return;
1317
1318         ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1319                             rb->rg_direction);
1320         rb->rg_device = NULL;
1321 }
1322
1323 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
1324 {
1325         rpcrdma_regbuf_dma_unmap(rb);
1326         if (rb)
1327                 kfree(rb->rg_data);
1328         kfree(rb);
1329 }
1330
1331 /**
1332  * rpcrdma_post_recvs - Refill the Receive Queue
1333  * @r_xprt: controlling transport instance
1334  * @needed: current credit grant
1335  * @temp: mark Receive buffers to be deleted after one use
1336  *
1337  */
1338 void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp)
1339 {
1340         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1341         struct rpcrdma_ep *ep = r_xprt->rx_ep;
1342         struct ib_recv_wr *wr, *bad_wr;
1343         struct rpcrdma_rep *rep;
1344         int count, rc;
1345
1346         rc = 0;
1347         count = 0;
1348
1349         if (likely(ep->re_receive_count > needed))
1350                 goto out;
1351         needed -= ep->re_receive_count;
1352         if (!temp)
1353                 needed += RPCRDMA_MAX_RECV_BATCH;
1354
1355         if (atomic_inc_return(&ep->re_receiving) > 1)
1356                 goto out;
1357
1358         /* fast path: all needed reps can be found on the free list */
1359         wr = NULL;
1360         while (needed) {
1361                 rep = rpcrdma_rep_get_locked(buf);
1362                 if (rep && rep->rr_temp) {
1363                         rpcrdma_rep_destroy(rep);
1364                         continue;
1365                 }
1366                 if (!rep)
1367                         rep = rpcrdma_rep_create(r_xprt, temp);
1368                 if (!rep)
1369                         break;
1370
1371                 rep->rr_cid.ci_queue_id = ep->re_attr.recv_cq->res.id;
1372                 trace_xprtrdma_post_recv(rep);
1373                 rep->rr_recv_wr.next = wr;
1374                 wr = &rep->rr_recv_wr;
1375                 --needed;
1376                 ++count;
1377         }
1378         if (!wr)
1379                 goto out;
1380
1381         rc = ib_post_recv(ep->re_id->qp, wr,
1382                           (const struct ib_recv_wr **)&bad_wr);
1383         if (rc) {
1384                 trace_xprtrdma_post_recvs_err(r_xprt, rc);
1385                 for (wr = bad_wr; wr;) {
1386                         struct rpcrdma_rep *rep;
1387
1388                         rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1389                         wr = wr->next;
1390                         rpcrdma_rep_put(buf, rep);
1391                         --count;
1392                 }
1393         }
1394         if (atomic_dec_return(&ep->re_receiving) > 0)
1395                 complete(&ep->re_done);
1396
1397 out:
1398         trace_xprtrdma_post_recvs(r_xprt, count);
1399         ep->re_receive_count += count;
1400         return;
1401 }