mptcp: Only send extra TCP acks in eligible socket states
[linux-2.6-microblaze.git] / net / sunrpc / xprtrdma / verbs.c
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41
42 /*
43  * verbs.c
44  *
45  * Encapsulates the major functions managing:
46  *  o adapters
47  *  o endpoints
48  *  o connections
49  *  o buffer memory
50  */
51
52 #include <linux/interrupt.h>
53 #include <linux/slab.h>
54 #include <linux/sunrpc/addr.h>
55 #include <linux/sunrpc/svc_rdma.h>
56 #include <linux/log2.h>
57
58 #include <asm-generic/barrier.h>
59 #include <asm/bitops.h>
60
61 #include <rdma/ib_cm.h>
62
63 #include "xprt_rdma.h"
64 #include <trace/events/rpcrdma.h>
65
66 /*
67  * Globals/Macros
68  */
69
70 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
71 # define RPCDBG_FACILITY        RPCDBG_TRANS
72 #endif
73
74 /*
75  * internal functions
76  */
77 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
78 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
79 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
80                                        struct rpcrdma_sendctx *sc);
81 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
82 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
83 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep);
84 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
85 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
86 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
87 static void rpcrdma_ep_get(struct rpcrdma_ep *ep);
88 static int rpcrdma_ep_put(struct rpcrdma_ep *ep);
89 static struct rpcrdma_regbuf *
90 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
91                      gfp_t flags);
92 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
93 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
94
95 /* Wait for outstanding transport work to finish. ib_drain_qp
96  * handles the drains in the wrong order for us, so open code
97  * them here.
98  */
99 static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
100 {
101         struct rpcrdma_ep *ep = r_xprt->rx_ep;
102         struct rdma_cm_id *id = ep->re_id;
103
104         /* Wait for rpcrdma_post_recvs() to leave its critical
105          * section.
106          */
107         if (atomic_inc_return(&ep->re_receiving) > 1)
108                 wait_for_completion(&ep->re_done);
109
110         /* Flush Receives, then wait for deferred Reply work
111          * to complete.
112          */
113         ib_drain_rq(id->qp);
114
115         /* Deferred Reply processing might have scheduled
116          * local invalidations.
117          */
118         ib_drain_sq(id->qp);
119
120         rpcrdma_ep_put(ep);
121 }
122
123 /* Ensure xprt_force_disconnect() is invoked exactly once when a
124  * connection is closed or lost. (The important thing is it needs
125  * to be invoked "at least" once).
126  */
127 static void rpcrdma_force_disconnect(struct rpcrdma_ep *ep)
128 {
129         if (atomic_add_unless(&ep->re_force_disconnect, 1, 1))
130                 xprt_force_disconnect(ep->re_xprt);
131 }
132
133 /**
134  * rpcrdma_flush_disconnect - Disconnect on flushed completion
135  * @r_xprt: transport to disconnect
136  * @wc: work completion entry
137  *
138  * Must be called in process context.
139  */
140 void rpcrdma_flush_disconnect(struct rpcrdma_xprt *r_xprt, struct ib_wc *wc)
141 {
142         if (wc->status != IB_WC_SUCCESS)
143                 rpcrdma_force_disconnect(r_xprt->rx_ep);
144 }
145
146 /**
147  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
148  * @cq: completion queue
149  * @wc: WCE for a completed Send WR
150  *
151  */
152 static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
153 {
154         struct ib_cqe *cqe = wc->wr_cqe;
155         struct rpcrdma_sendctx *sc =
156                 container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
157         struct rpcrdma_xprt *r_xprt = cq->cq_context;
158
159         /* WARNING: Only wr_cqe and status are reliable at this point */
160         trace_xprtrdma_wc_send(wc, &sc->sc_cid);
161         rpcrdma_sendctx_put_locked(r_xprt, sc);
162         rpcrdma_flush_disconnect(r_xprt, wc);
163 }
164
165 /**
166  * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
167  * @cq: completion queue
168  * @wc: WCE for a completed Receive WR
169  *
170  */
171 static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
172 {
173         struct ib_cqe *cqe = wc->wr_cqe;
174         struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
175                                                rr_cqe);
176         struct rpcrdma_xprt *r_xprt = cq->cq_context;
177
178         /* WARNING: Only wr_cqe and status are reliable at this point */
179         trace_xprtrdma_wc_receive(wc, &rep->rr_cid);
180         --r_xprt->rx_ep->re_receive_count;
181         if (wc->status != IB_WC_SUCCESS)
182                 goto out_flushed;
183
184         /* status == SUCCESS means all fields in wc are trustworthy */
185         rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
186         rep->rr_wc_flags = wc->wc_flags;
187         rep->rr_inv_rkey = wc->ex.invalidate_rkey;
188
189         ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
190                                    rdmab_addr(rep->rr_rdmabuf),
191                                    wc->byte_len, DMA_FROM_DEVICE);
192
193         rpcrdma_reply_handler(rep);
194         return;
195
196 out_flushed:
197         rpcrdma_flush_disconnect(r_xprt, wc);
198         rpcrdma_rep_put(&r_xprt->rx_buf, rep);
199 }
200
201 static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
202                                       struct rdma_conn_param *param)
203 {
204         const struct rpcrdma_connect_private *pmsg = param->private_data;
205         unsigned int rsize, wsize;
206
207         /* Default settings for RPC-over-RDMA Version One */
208         ep->re_implicit_roundup = xprt_rdma_pad_optimize;
209         rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
210         wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
211
212         if (pmsg &&
213             pmsg->cp_magic == rpcrdma_cmp_magic &&
214             pmsg->cp_version == RPCRDMA_CMP_VERSION) {
215                 ep->re_implicit_roundup = true;
216                 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
217                 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
218         }
219
220         if (rsize < ep->re_inline_recv)
221                 ep->re_inline_recv = rsize;
222         if (wsize < ep->re_inline_send)
223                 ep->re_inline_send = wsize;
224
225         rpcrdma_set_max_header_sizes(ep);
226 }
227
228 /**
229  * rpcrdma_cm_event_handler - Handle RDMA CM events
230  * @id: rdma_cm_id on which an event has occurred
231  * @event: details of the event
232  *
233  * Called with @id's mutex held. Returns 1 if caller should
234  * destroy @id, otherwise 0.
235  */
236 static int
237 rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
238 {
239         struct sockaddr *sap = (struct sockaddr *)&id->route.addr.dst_addr;
240         struct rpcrdma_ep *ep = id->context;
241
242         might_sleep();
243
244         switch (event->event) {
245         case RDMA_CM_EVENT_ADDR_RESOLVED:
246         case RDMA_CM_EVENT_ROUTE_RESOLVED:
247                 ep->re_async_rc = 0;
248                 complete(&ep->re_done);
249                 return 0;
250         case RDMA_CM_EVENT_ADDR_ERROR:
251                 ep->re_async_rc = -EPROTO;
252                 complete(&ep->re_done);
253                 return 0;
254         case RDMA_CM_EVENT_ROUTE_ERROR:
255                 ep->re_async_rc = -ENETUNREACH;
256                 complete(&ep->re_done);
257                 return 0;
258         case RDMA_CM_EVENT_DEVICE_REMOVAL:
259                 pr_info("rpcrdma: removing device %s for %pISpc\n",
260                         ep->re_id->device->name, sap);
261                 fallthrough;
262         case RDMA_CM_EVENT_ADDR_CHANGE:
263                 ep->re_connect_status = -ENODEV;
264                 goto disconnected;
265         case RDMA_CM_EVENT_ESTABLISHED:
266                 rpcrdma_ep_get(ep);
267                 ep->re_connect_status = 1;
268                 rpcrdma_update_cm_private(ep, &event->param.conn);
269                 trace_xprtrdma_inline_thresh(ep);
270                 wake_up_all(&ep->re_connect_wait);
271                 break;
272         case RDMA_CM_EVENT_CONNECT_ERROR:
273                 ep->re_connect_status = -ENOTCONN;
274                 goto wake_connect_worker;
275         case RDMA_CM_EVENT_UNREACHABLE:
276                 ep->re_connect_status = -ENETUNREACH;
277                 goto wake_connect_worker;
278         case RDMA_CM_EVENT_REJECTED:
279                 dprintk("rpcrdma: connection to %pISpc rejected: %s\n",
280                         sap, rdma_reject_msg(id, event->status));
281                 ep->re_connect_status = -ECONNREFUSED;
282                 if (event->status == IB_CM_REJ_STALE_CONN)
283                         ep->re_connect_status = -ENOTCONN;
284 wake_connect_worker:
285                 wake_up_all(&ep->re_connect_wait);
286                 return 0;
287         case RDMA_CM_EVENT_DISCONNECTED:
288                 ep->re_connect_status = -ECONNABORTED;
289 disconnected:
290                 rpcrdma_force_disconnect(ep);
291                 return rpcrdma_ep_put(ep);
292         default:
293                 break;
294         }
295
296         dprintk("RPC:       %s: %pISpc on %s/frwr: %s\n", __func__, sap,
297                 ep->re_id->device->name, rdma_event_msg(event->event));
298         return 0;
299 }
300
301 static struct rdma_cm_id *rpcrdma_create_id(struct rpcrdma_xprt *r_xprt,
302                                             struct rpcrdma_ep *ep)
303 {
304         unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
305         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
306         struct rdma_cm_id *id;
307         int rc;
308
309         init_completion(&ep->re_done);
310
311         id = rdma_create_id(xprt->xprt_net, rpcrdma_cm_event_handler, ep,
312                             RDMA_PS_TCP, IB_QPT_RC);
313         if (IS_ERR(id))
314                 return id;
315
316         ep->re_async_rc = -ETIMEDOUT;
317         rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)&xprt->addr,
318                                RDMA_RESOLVE_TIMEOUT);
319         if (rc)
320                 goto out;
321         rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
322         if (rc < 0)
323                 goto out;
324
325         rc = ep->re_async_rc;
326         if (rc)
327                 goto out;
328
329         ep->re_async_rc = -ETIMEDOUT;
330         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
331         if (rc)
332                 goto out;
333         rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
334         if (rc < 0)
335                 goto out;
336         rc = ep->re_async_rc;
337         if (rc)
338                 goto out;
339
340         return id;
341
342 out:
343         rdma_destroy_id(id);
344         return ERR_PTR(rc);
345 }
346
347 static void rpcrdma_ep_destroy(struct kref *kref)
348 {
349         struct rpcrdma_ep *ep = container_of(kref, struct rpcrdma_ep, re_kref);
350
351         if (ep->re_id->qp) {
352                 rdma_destroy_qp(ep->re_id);
353                 ep->re_id->qp = NULL;
354         }
355
356         if (ep->re_attr.recv_cq)
357                 ib_free_cq(ep->re_attr.recv_cq);
358         ep->re_attr.recv_cq = NULL;
359         if (ep->re_attr.send_cq)
360                 ib_free_cq(ep->re_attr.send_cq);
361         ep->re_attr.send_cq = NULL;
362
363         if (ep->re_pd)
364                 ib_dealloc_pd(ep->re_pd);
365         ep->re_pd = NULL;
366
367         kfree(ep);
368         module_put(THIS_MODULE);
369 }
370
371 static noinline void rpcrdma_ep_get(struct rpcrdma_ep *ep)
372 {
373         kref_get(&ep->re_kref);
374 }
375
376 /* Returns:
377  *     %0 if @ep still has a positive kref count, or
378  *     %1 if @ep was destroyed successfully.
379  */
380 static noinline int rpcrdma_ep_put(struct rpcrdma_ep *ep)
381 {
382         return kref_put(&ep->re_kref, rpcrdma_ep_destroy);
383 }
384
385 static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
386 {
387         struct rpcrdma_connect_private *pmsg;
388         struct ib_device *device;
389         struct rdma_cm_id *id;
390         struct rpcrdma_ep *ep;
391         int rc;
392
393         ep = kzalloc(sizeof(*ep), GFP_NOFS);
394         if (!ep)
395                 return -ENOTCONN;
396         ep->re_xprt = &r_xprt->rx_xprt;
397         kref_init(&ep->re_kref);
398
399         id = rpcrdma_create_id(r_xprt, ep);
400         if (IS_ERR(id)) {
401                 kfree(ep);
402                 return PTR_ERR(id);
403         }
404         __module_get(THIS_MODULE);
405         device = id->device;
406         ep->re_id = id;
407         reinit_completion(&ep->re_done);
408
409         ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
410         ep->re_inline_send = xprt_rdma_max_inline_write;
411         ep->re_inline_recv = xprt_rdma_max_inline_read;
412         rc = frwr_query_device(ep, device);
413         if (rc)
414                 goto out_destroy;
415
416         r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
417
418         ep->re_attr.srq = NULL;
419         ep->re_attr.cap.max_inline_data = 0;
420         ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
421         ep->re_attr.qp_type = IB_QPT_RC;
422         ep->re_attr.port_num = ~0;
423
424         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
425                 "iovs: send %d recv %d\n",
426                 __func__,
427                 ep->re_attr.cap.max_send_wr,
428                 ep->re_attr.cap.max_recv_wr,
429                 ep->re_attr.cap.max_send_sge,
430                 ep->re_attr.cap.max_recv_sge);
431
432         ep->re_send_batch = ep->re_max_requests >> 3;
433         ep->re_send_count = ep->re_send_batch;
434         init_waitqueue_head(&ep->re_connect_wait);
435
436         ep->re_attr.send_cq = ib_alloc_cq_any(device, r_xprt,
437                                               ep->re_attr.cap.max_send_wr,
438                                               IB_POLL_WORKQUEUE);
439         if (IS_ERR(ep->re_attr.send_cq)) {
440                 rc = PTR_ERR(ep->re_attr.send_cq);
441                 goto out_destroy;
442         }
443
444         ep->re_attr.recv_cq = ib_alloc_cq_any(device, r_xprt,
445                                               ep->re_attr.cap.max_recv_wr,
446                                               IB_POLL_WORKQUEUE);
447         if (IS_ERR(ep->re_attr.recv_cq)) {
448                 rc = PTR_ERR(ep->re_attr.recv_cq);
449                 goto out_destroy;
450         }
451         ep->re_receive_count = 0;
452
453         /* Initialize cma parameters */
454         memset(&ep->re_remote_cma, 0, sizeof(ep->re_remote_cma));
455
456         /* Prepare RDMA-CM private message */
457         pmsg = &ep->re_cm_private;
458         pmsg->cp_magic = rpcrdma_cmp_magic;
459         pmsg->cp_version = RPCRDMA_CMP_VERSION;
460         pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
461         pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->re_inline_send);
462         pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->re_inline_recv);
463         ep->re_remote_cma.private_data = pmsg;
464         ep->re_remote_cma.private_data_len = sizeof(*pmsg);
465
466         /* Client offers RDMA Read but does not initiate */
467         ep->re_remote_cma.initiator_depth = 0;
468         ep->re_remote_cma.responder_resources =
469                 min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
470
471         /* Limit transport retries so client can detect server
472          * GID changes quickly. RPC layer handles re-establishing
473          * transport connection and retransmission.
474          */
475         ep->re_remote_cma.retry_count = 6;
476
477         /* RPC-over-RDMA handles its own flow control. In addition,
478          * make all RNR NAKs visible so we know that RPC-over-RDMA
479          * flow control is working correctly (no NAKs should be seen).
480          */
481         ep->re_remote_cma.flow_control = 0;
482         ep->re_remote_cma.rnr_retry_count = 0;
483
484         ep->re_pd = ib_alloc_pd(device, 0);
485         if (IS_ERR(ep->re_pd)) {
486                 rc = PTR_ERR(ep->re_pd);
487                 goto out_destroy;
488         }
489
490         rc = rdma_create_qp(id, ep->re_pd, &ep->re_attr);
491         if (rc)
492                 goto out_destroy;
493
494         r_xprt->rx_ep = ep;
495         return 0;
496
497 out_destroy:
498         rpcrdma_ep_put(ep);
499         rdma_destroy_id(id);
500         return rc;
501 }
502
503 /**
504  * rpcrdma_xprt_connect - Connect an unconnected transport
505  * @r_xprt: controlling transport instance
506  *
507  * Returns 0 on success or a negative errno.
508  */
509 int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
510 {
511         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
512         struct rpcrdma_ep *ep;
513         int rc;
514
515         rc = rpcrdma_ep_create(r_xprt);
516         if (rc)
517                 return rc;
518         ep = r_xprt->rx_ep;
519
520         xprt_clear_connected(xprt);
521         rpcrdma_reset_cwnd(r_xprt);
522
523         /* Bump the ep's reference count while there are
524          * outstanding Receives.
525          */
526         rpcrdma_ep_get(ep);
527         rpcrdma_post_recvs(r_xprt, 1, true);
528
529         rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
530         if (rc)
531                 goto out;
532
533         if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
534                 xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
535         wait_event_interruptible(ep->re_connect_wait,
536                                  ep->re_connect_status != 0);
537         if (ep->re_connect_status <= 0) {
538                 rc = ep->re_connect_status;
539                 goto out;
540         }
541
542         rc = rpcrdma_sendctxs_create(r_xprt);
543         if (rc) {
544                 rc = -ENOTCONN;
545                 goto out;
546         }
547
548         rc = rpcrdma_reqs_setup(r_xprt);
549         if (rc) {
550                 rc = -ENOTCONN;
551                 goto out;
552         }
553         rpcrdma_mrs_create(r_xprt);
554
555 out:
556         trace_xprtrdma_connect(r_xprt, rc);
557         return rc;
558 }
559
560 /**
561  * rpcrdma_xprt_disconnect - Disconnect underlying transport
562  * @r_xprt: controlling transport instance
563  *
564  * Caller serializes. Either the transport send lock is held,
565  * or we're being called to destroy the transport.
566  *
567  * On return, @r_xprt is completely divested of all hardware
568  * resources and prepared for the next ->connect operation.
569  */
570 void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
571 {
572         struct rpcrdma_ep *ep = r_xprt->rx_ep;
573         struct rdma_cm_id *id;
574         int rc;
575
576         if (!ep)
577                 return;
578
579         id = ep->re_id;
580         rc = rdma_disconnect(id);
581         trace_xprtrdma_disconnect(r_xprt, rc);
582
583         rpcrdma_xprt_drain(r_xprt);
584         rpcrdma_reps_unmap(r_xprt);
585         rpcrdma_reqs_reset(r_xprt);
586         rpcrdma_mrs_destroy(r_xprt);
587         rpcrdma_sendctxs_destroy(r_xprt);
588
589         if (rpcrdma_ep_put(ep))
590                 rdma_destroy_id(id);
591
592         r_xprt->rx_ep = NULL;
593 }
594
595 /* Fixed-size circular FIFO queue. This implementation is wait-free and
596  * lock-free.
597  *
598  * Consumer is the code path that posts Sends. This path dequeues a
599  * sendctx for use by a Send operation. Multiple consumer threads
600  * are serialized by the RPC transport lock, which allows only one
601  * ->send_request call at a time.
602  *
603  * Producer is the code path that handles Send completions. This path
604  * enqueues a sendctx that has been completed. Multiple producer
605  * threads are serialized by the ib_poll_cq() function.
606  */
607
608 /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
609  * queue activity, and rpcrdma_xprt_drain has flushed all remaining
610  * Send requests.
611  */
612 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
613 {
614         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
615         unsigned long i;
616
617         if (!buf->rb_sc_ctxs)
618                 return;
619         for (i = 0; i <= buf->rb_sc_last; i++)
620                 kfree(buf->rb_sc_ctxs[i]);
621         kfree(buf->rb_sc_ctxs);
622         buf->rb_sc_ctxs = NULL;
623 }
624
625 static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
626 {
627         struct rpcrdma_sendctx *sc;
628
629         sc = kzalloc(struct_size(sc, sc_sges, ep->re_attr.cap.max_send_sge),
630                      GFP_KERNEL);
631         if (!sc)
632                 return NULL;
633
634         sc->sc_cqe.done = rpcrdma_wc_send;
635         sc->sc_cid.ci_queue_id = ep->re_attr.send_cq->res.id;
636         sc->sc_cid.ci_completion_id =
637                 atomic_inc_return(&ep->re_completion_ids);
638         return sc;
639 }
640
641 static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
642 {
643         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
644         struct rpcrdma_sendctx *sc;
645         unsigned long i;
646
647         /* Maximum number of concurrent outstanding Send WRs. Capping
648          * the circular queue size stops Send Queue overflow by causing
649          * the ->send_request call to fail temporarily before too many
650          * Sends are posted.
651          */
652         i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
653         buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
654         if (!buf->rb_sc_ctxs)
655                 return -ENOMEM;
656
657         buf->rb_sc_last = i - 1;
658         for (i = 0; i <= buf->rb_sc_last; i++) {
659                 sc = rpcrdma_sendctx_create(r_xprt->rx_ep);
660                 if (!sc)
661                         return -ENOMEM;
662
663                 buf->rb_sc_ctxs[i] = sc;
664         }
665
666         buf->rb_sc_head = 0;
667         buf->rb_sc_tail = 0;
668         return 0;
669 }
670
671 /* The sendctx queue is not guaranteed to have a size that is a
672  * power of two, thus the helpers in circ_buf.h cannot be used.
673  * The other option is to use modulus (%), which can be expensive.
674  */
675 static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
676                                           unsigned long item)
677 {
678         return likely(item < buf->rb_sc_last) ? item + 1 : 0;
679 }
680
681 /**
682  * rpcrdma_sendctx_get_locked - Acquire a send context
683  * @r_xprt: controlling transport instance
684  *
685  * Returns pointer to a free send completion context; or NULL if
686  * the queue is empty.
687  *
688  * Usage: Called to acquire an SGE array before preparing a Send WR.
689  *
690  * The caller serializes calls to this function (per transport), and
691  * provides an effective memory barrier that flushes the new value
692  * of rb_sc_head.
693  */
694 struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
695 {
696         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
697         struct rpcrdma_sendctx *sc;
698         unsigned long next_head;
699
700         next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
701
702         if (next_head == READ_ONCE(buf->rb_sc_tail))
703                 goto out_emptyq;
704
705         /* ORDER: item must be accessed _before_ head is updated */
706         sc = buf->rb_sc_ctxs[next_head];
707
708         /* Releasing the lock in the caller acts as a memory
709          * barrier that flushes rb_sc_head.
710          */
711         buf->rb_sc_head = next_head;
712
713         return sc;
714
715 out_emptyq:
716         /* The queue is "empty" if there have not been enough Send
717          * completions recently. This is a sign the Send Queue is
718          * backing up. Cause the caller to pause and try again.
719          */
720         xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
721         r_xprt->rx_stats.empty_sendctx_q++;
722         return NULL;
723 }
724
725 /**
726  * rpcrdma_sendctx_put_locked - Release a send context
727  * @r_xprt: controlling transport instance
728  * @sc: send context to release
729  *
730  * Usage: Called from Send completion to return a sendctxt
731  * to the queue.
732  *
733  * The caller serializes calls to this function (per transport).
734  */
735 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
736                                        struct rpcrdma_sendctx *sc)
737 {
738         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
739         unsigned long next_tail;
740
741         /* Unmap SGEs of previously completed but unsignaled
742          * Sends by walking up the queue until @sc is found.
743          */
744         next_tail = buf->rb_sc_tail;
745         do {
746                 next_tail = rpcrdma_sendctx_next(buf, next_tail);
747
748                 /* ORDER: item must be accessed _before_ tail is updated */
749                 rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
750
751         } while (buf->rb_sc_ctxs[next_tail] != sc);
752
753         /* Paired with READ_ONCE */
754         smp_store_release(&buf->rb_sc_tail, next_tail);
755
756         xprt_write_space(&r_xprt->rx_xprt);
757 }
758
759 static void
760 rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
761 {
762         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
763         struct rpcrdma_ep *ep = r_xprt->rx_ep;
764         unsigned int count;
765
766         for (count = 0; count < ep->re_max_rdma_segs; count++) {
767                 struct rpcrdma_mr *mr;
768                 int rc;
769
770                 mr = kzalloc(sizeof(*mr), GFP_NOFS);
771                 if (!mr)
772                         break;
773
774                 rc = frwr_mr_init(r_xprt, mr);
775                 if (rc) {
776                         kfree(mr);
777                         break;
778                 }
779
780                 spin_lock(&buf->rb_lock);
781                 rpcrdma_mr_push(mr, &buf->rb_mrs);
782                 list_add(&mr->mr_all, &buf->rb_all_mrs);
783                 spin_unlock(&buf->rb_lock);
784         }
785
786         r_xprt->rx_stats.mrs_allocated += count;
787         trace_xprtrdma_createmrs(r_xprt, count);
788 }
789
790 static void
791 rpcrdma_mr_refresh_worker(struct work_struct *work)
792 {
793         struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
794                                                   rb_refresh_worker);
795         struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
796                                                    rx_buf);
797
798         rpcrdma_mrs_create(r_xprt);
799         xprt_write_space(&r_xprt->rx_xprt);
800 }
801
802 /**
803  * rpcrdma_mrs_refresh - Wake the MR refresh worker
804  * @r_xprt: controlling transport instance
805  *
806  */
807 void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
808 {
809         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
810         struct rpcrdma_ep *ep = r_xprt->rx_ep;
811
812         /* If there is no underlying connection, it's no use
813          * to wake the refresh worker.
814          */
815         if (ep->re_connect_status == 1) {
816                 /* The work is scheduled on a WQ_MEM_RECLAIM
817                  * workqueue in order to prevent MR allocation
818                  * from recursing into NFS during direct reclaim.
819                  */
820                 queue_work(xprtiod_workqueue, &buf->rb_refresh_worker);
821         }
822 }
823
824 /**
825  * rpcrdma_req_create - Allocate an rpcrdma_req object
826  * @r_xprt: controlling r_xprt
827  * @size: initial size, in bytes, of send and receive buffers
828  * @flags: GFP flags passed to memory allocators
829  *
830  * Returns an allocated and fully initialized rpcrdma_req or NULL.
831  */
832 struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
833                                        gfp_t flags)
834 {
835         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
836         struct rpcrdma_req *req;
837
838         req = kzalloc(sizeof(*req), flags);
839         if (req == NULL)
840                 goto out1;
841
842         req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
843         if (!req->rl_sendbuf)
844                 goto out2;
845
846         req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
847         if (!req->rl_recvbuf)
848                 goto out3;
849
850         INIT_LIST_HEAD(&req->rl_free_mrs);
851         INIT_LIST_HEAD(&req->rl_registered);
852         spin_lock(&buffer->rb_lock);
853         list_add(&req->rl_all, &buffer->rb_allreqs);
854         spin_unlock(&buffer->rb_lock);
855         return req;
856
857 out3:
858         kfree(req->rl_sendbuf);
859 out2:
860         kfree(req);
861 out1:
862         return NULL;
863 }
864
865 /**
866  * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
867  * @r_xprt: controlling transport instance
868  * @req: rpcrdma_req object to set up
869  *
870  * Returns zero on success, and a negative errno on failure.
871  */
872 int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
873 {
874         struct rpcrdma_regbuf *rb;
875         size_t maxhdrsize;
876
877         /* Compute maximum header buffer size in bytes */
878         maxhdrsize = rpcrdma_fixed_maxsz + 3 +
879                      r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
880         maxhdrsize *= sizeof(__be32);
881         rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
882                                   DMA_TO_DEVICE, GFP_KERNEL);
883         if (!rb)
884                 goto out;
885
886         if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
887                 goto out_free;
888
889         req->rl_rdmabuf = rb;
890         xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
891         return 0;
892
893 out_free:
894         rpcrdma_regbuf_free(rb);
895 out:
896         return -ENOMEM;
897 }
898
899 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
900  * and thus can be walked without holding rb_lock. Eg. the
901  * caller is holding the transport send lock to exclude
902  * device removal or disconnection.
903  */
904 static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
905 {
906         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
907         struct rpcrdma_req *req;
908         int rc;
909
910         list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
911                 rc = rpcrdma_req_setup(r_xprt, req);
912                 if (rc)
913                         return rc;
914         }
915         return 0;
916 }
917
918 static void rpcrdma_req_reset(struct rpcrdma_req *req)
919 {
920         /* Credits are valid for only one connection */
921         req->rl_slot.rq_cong = 0;
922
923         rpcrdma_regbuf_free(req->rl_rdmabuf);
924         req->rl_rdmabuf = NULL;
925
926         rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
927         rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
928
929         frwr_reset(req);
930 }
931
932 /* ASSUMPTION: the rb_allreqs list is stable for the duration,
933  * and thus can be walked without holding rb_lock. Eg. the
934  * caller is holding the transport send lock to exclude
935  * device removal or disconnection.
936  */
937 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
938 {
939         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
940         struct rpcrdma_req *req;
941
942         list_for_each_entry(req, &buf->rb_allreqs, rl_all)
943                 rpcrdma_req_reset(req);
944 }
945
946 static noinline
947 struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
948                                        bool temp)
949 {
950         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
951         struct rpcrdma_rep *rep;
952
953         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
954         if (rep == NULL)
955                 goto out;
956
957         rep->rr_rdmabuf = rpcrdma_regbuf_alloc(r_xprt->rx_ep->re_inline_recv,
958                                                DMA_FROM_DEVICE, GFP_KERNEL);
959         if (!rep->rr_rdmabuf)
960                 goto out_free;
961
962         if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf))
963                 goto out_free_regbuf;
964
965         rep->rr_cid.ci_completion_id =
966                 atomic_inc_return(&r_xprt->rx_ep->re_completion_ids);
967
968         xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
969                      rdmab_length(rep->rr_rdmabuf));
970         rep->rr_cqe.done = rpcrdma_wc_receive;
971         rep->rr_rxprt = r_xprt;
972         rep->rr_recv_wr.next = NULL;
973         rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
974         rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
975         rep->rr_recv_wr.num_sge = 1;
976         rep->rr_temp = temp;
977
978         spin_lock(&buf->rb_lock);
979         list_add(&rep->rr_all, &buf->rb_all_reps);
980         spin_unlock(&buf->rb_lock);
981         return rep;
982
983 out_free_regbuf:
984         rpcrdma_regbuf_free(rep->rr_rdmabuf);
985 out_free:
986         kfree(rep);
987 out:
988         return NULL;
989 }
990
991 static void rpcrdma_rep_free(struct rpcrdma_rep *rep)
992 {
993         rpcrdma_regbuf_free(rep->rr_rdmabuf);
994         kfree(rep);
995 }
996
997 static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
998 {
999         struct rpcrdma_buffer *buf = &rep->rr_rxprt->rx_buf;
1000
1001         spin_lock(&buf->rb_lock);
1002         list_del(&rep->rr_all);
1003         spin_unlock(&buf->rb_lock);
1004
1005         rpcrdma_rep_free(rep);
1006 }
1007
1008 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
1009 {
1010         struct llist_node *node;
1011
1012         /* Calls to llist_del_first are required to be serialized */
1013         node = llist_del_first(&buf->rb_free_reps);
1014         if (!node)
1015                 return NULL;
1016         return llist_entry(node, struct rpcrdma_rep, rr_node);
1017 }
1018
1019 /**
1020  * rpcrdma_rep_put - Release rpcrdma_rep back to free list
1021  * @buf: buffer pool
1022  * @rep: rep to release
1023  *
1024  */
1025 void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
1026 {
1027         llist_add(&rep->rr_node, &buf->rb_free_reps);
1028 }
1029
1030 /* Caller must ensure the QP is quiescent (RQ is drained) before
1031  * invoking this function, to guarantee rb_all_reps is not
1032  * changing.
1033  */
1034 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
1035 {
1036         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1037         struct rpcrdma_rep *rep;
1038
1039         list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
1040                 rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
1041                 rep->rr_temp = true;    /* Mark this rep for destruction */
1042         }
1043 }
1044
1045 static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
1046 {
1047         struct rpcrdma_rep *rep;
1048
1049         spin_lock(&buf->rb_lock);
1050         while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
1051                                                struct rpcrdma_rep,
1052                                                rr_all)) != NULL) {
1053                 list_del(&rep->rr_all);
1054                 spin_unlock(&buf->rb_lock);
1055
1056                 rpcrdma_rep_free(rep);
1057
1058                 spin_lock(&buf->rb_lock);
1059         }
1060         spin_unlock(&buf->rb_lock);
1061 }
1062
1063 /**
1064  * rpcrdma_buffer_create - Create initial set of req/rep objects
1065  * @r_xprt: transport instance to (re)initialize
1066  *
1067  * Returns zero on success, otherwise a negative errno.
1068  */
1069 int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1070 {
1071         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1072         int i, rc;
1073
1074         buf->rb_bc_srv_max_requests = 0;
1075         spin_lock_init(&buf->rb_lock);
1076         INIT_LIST_HEAD(&buf->rb_mrs);
1077         INIT_LIST_HEAD(&buf->rb_all_mrs);
1078         INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
1079
1080         INIT_LIST_HEAD(&buf->rb_send_bufs);
1081         INIT_LIST_HEAD(&buf->rb_allreqs);
1082         INIT_LIST_HEAD(&buf->rb_all_reps);
1083
1084         rc = -ENOMEM;
1085         for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
1086                 struct rpcrdma_req *req;
1087
1088                 req = rpcrdma_req_create(r_xprt, RPCRDMA_V1_DEF_INLINE_SIZE * 2,
1089                                          GFP_KERNEL);
1090                 if (!req)
1091                         goto out;
1092                 list_add(&req->rl_list, &buf->rb_send_bufs);
1093         }
1094
1095         init_llist_head(&buf->rb_free_reps);
1096
1097         return 0;
1098 out:
1099         rpcrdma_buffer_destroy(buf);
1100         return rc;
1101 }
1102
1103 /**
1104  * rpcrdma_req_destroy - Destroy an rpcrdma_req object
1105  * @req: unused object to be destroyed
1106  *
1107  * Relies on caller holding the transport send lock to protect
1108  * removing req->rl_all from buf->rb_all_reqs safely.
1109  */
1110 void rpcrdma_req_destroy(struct rpcrdma_req *req)
1111 {
1112         struct rpcrdma_mr *mr;
1113
1114         list_del(&req->rl_all);
1115
1116         while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
1117                 struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
1118
1119                 spin_lock(&buf->rb_lock);
1120                 list_del(&mr->mr_all);
1121                 spin_unlock(&buf->rb_lock);
1122
1123                 frwr_mr_release(mr);
1124         }
1125
1126         rpcrdma_regbuf_free(req->rl_recvbuf);
1127         rpcrdma_regbuf_free(req->rl_sendbuf);
1128         rpcrdma_regbuf_free(req->rl_rdmabuf);
1129         kfree(req);
1130 }
1131
1132 /**
1133  * rpcrdma_mrs_destroy - Release all of a transport's MRs
1134  * @r_xprt: controlling transport instance
1135  *
1136  * Relies on caller holding the transport send lock to protect
1137  * removing mr->mr_list from req->rl_free_mrs safely.
1138  */
1139 static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
1140 {
1141         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1142         struct rpcrdma_mr *mr;
1143
1144         cancel_work_sync(&buf->rb_refresh_worker);
1145
1146         spin_lock(&buf->rb_lock);
1147         while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
1148                                               struct rpcrdma_mr,
1149                                               mr_all)) != NULL) {
1150                 list_del(&mr->mr_list);
1151                 list_del(&mr->mr_all);
1152                 spin_unlock(&buf->rb_lock);
1153
1154                 frwr_mr_release(mr);
1155
1156                 spin_lock(&buf->rb_lock);
1157         }
1158         spin_unlock(&buf->rb_lock);
1159 }
1160
1161 /**
1162  * rpcrdma_buffer_destroy - Release all hw resources
1163  * @buf: root control block for resources
1164  *
1165  * ORDERING: relies on a prior rpcrdma_xprt_drain :
1166  * - No more Send or Receive completions can occur
1167  * - All MRs, reps, and reqs are returned to their free lists
1168  */
1169 void
1170 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1171 {
1172         rpcrdma_reps_destroy(buf);
1173
1174         while (!list_empty(&buf->rb_send_bufs)) {
1175                 struct rpcrdma_req *req;
1176
1177                 req = list_first_entry(&buf->rb_send_bufs,
1178                                        struct rpcrdma_req, rl_list);
1179                 list_del(&req->rl_list);
1180                 rpcrdma_req_destroy(req);
1181         }
1182 }
1183
1184 /**
1185  * rpcrdma_mr_get - Allocate an rpcrdma_mr object
1186  * @r_xprt: controlling transport
1187  *
1188  * Returns an initialized rpcrdma_mr or NULL if no free
1189  * rpcrdma_mr objects are available.
1190  */
1191 struct rpcrdma_mr *
1192 rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
1193 {
1194         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1195         struct rpcrdma_mr *mr;
1196
1197         spin_lock(&buf->rb_lock);
1198         mr = rpcrdma_mr_pop(&buf->rb_mrs);
1199         spin_unlock(&buf->rb_lock);
1200         return mr;
1201 }
1202
1203 /**
1204  * rpcrdma_reply_put - Put reply buffers back into pool
1205  * @buffers: buffer pool
1206  * @req: object to return
1207  *
1208  */
1209 void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1210 {
1211         if (req->rl_reply) {
1212                 rpcrdma_rep_put(buffers, req->rl_reply);
1213                 req->rl_reply = NULL;
1214         }
1215 }
1216
1217 /**
1218  * rpcrdma_buffer_get - Get a request buffer
1219  * @buffers: Buffer pool from which to obtain a buffer
1220  *
1221  * Returns a fresh rpcrdma_req, or NULL if none are available.
1222  */
1223 struct rpcrdma_req *
1224 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1225 {
1226         struct rpcrdma_req *req;
1227
1228         spin_lock(&buffers->rb_lock);
1229         req = list_first_entry_or_null(&buffers->rb_send_bufs,
1230                                        struct rpcrdma_req, rl_list);
1231         if (req)
1232                 list_del_init(&req->rl_list);
1233         spin_unlock(&buffers->rb_lock);
1234         return req;
1235 }
1236
1237 /**
1238  * rpcrdma_buffer_put - Put request/reply buffers back into pool
1239  * @buffers: buffer pool
1240  * @req: object to return
1241  *
1242  */
1243 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
1244 {
1245         rpcrdma_reply_put(buffers, req);
1246
1247         spin_lock(&buffers->rb_lock);
1248         list_add(&req->rl_list, &buffers->rb_send_bufs);
1249         spin_unlock(&buffers->rb_lock);
1250 }
1251
1252 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
1253  *
1254  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1255  * receiving the payload of RDMA RECV operations. During Long Calls
1256  * or Replies they may be registered externally via frwr_map.
1257  */
1258 static struct rpcrdma_regbuf *
1259 rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction,
1260                      gfp_t flags)
1261 {
1262         struct rpcrdma_regbuf *rb;
1263
1264         rb = kmalloc(sizeof(*rb), flags);
1265         if (!rb)
1266                 return NULL;
1267         rb->rg_data = kmalloc(size, flags);
1268         if (!rb->rg_data) {
1269                 kfree(rb);
1270                 return NULL;
1271         }
1272
1273         rb->rg_device = NULL;
1274         rb->rg_direction = direction;
1275         rb->rg_iov.length = size;
1276         return rb;
1277 }
1278
1279 /**
1280  * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
1281  * @rb: regbuf to reallocate
1282  * @size: size of buffer to be allocated, in bytes
1283  * @flags: GFP flags
1284  *
1285  * Returns true if reallocation was successful. If false is
1286  * returned, @rb is left untouched.
1287  */
1288 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
1289 {
1290         void *buf;
1291
1292         buf = kmalloc(size, flags);
1293         if (!buf)
1294                 return false;
1295
1296         rpcrdma_regbuf_dma_unmap(rb);
1297         kfree(rb->rg_data);
1298
1299         rb->rg_data = buf;
1300         rb->rg_iov.length = size;
1301         return true;
1302 }
1303
1304 /**
1305  * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
1306  * @r_xprt: controlling transport instance
1307  * @rb: regbuf to be mapped
1308  *
1309  * Returns true if the buffer is now DMA mapped to @r_xprt's device
1310  */
1311 bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
1312                               struct rpcrdma_regbuf *rb)
1313 {
1314         struct ib_device *device = r_xprt->rx_ep->re_id->device;
1315
1316         if (rb->rg_direction == DMA_NONE)
1317                 return false;
1318
1319         rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
1320                                             rdmab_length(rb), rb->rg_direction);
1321         if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
1322                 trace_xprtrdma_dma_maperr(rdmab_addr(rb));
1323                 return false;
1324         }
1325
1326         rb->rg_device = device;
1327         rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
1328         return true;
1329 }
1330
1331 static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
1332 {
1333         if (!rb)
1334                 return;
1335
1336         if (!rpcrdma_regbuf_is_mapped(rb))
1337                 return;
1338
1339         ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
1340                             rb->rg_direction);
1341         rb->rg_device = NULL;
1342 }
1343
1344 static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
1345 {
1346         rpcrdma_regbuf_dma_unmap(rb);
1347         if (rb)
1348                 kfree(rb->rg_data);
1349         kfree(rb);
1350 }
1351
1352 /**
1353  * rpcrdma_post_sends - Post WRs to a transport's Send Queue
1354  * @r_xprt: controlling transport instance
1355  * @req: rpcrdma_req containing the Send WR to post
1356  *
1357  * Returns 0 if the post was successful, otherwise -ENOTCONN
1358  * is returned.
1359  */
1360 int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1361 {
1362         if (frwr_send(r_xprt, req))
1363                 return -ENOTCONN;
1364         return 0;
1365 }
1366
1367 /**
1368  * rpcrdma_post_recvs - Refill the Receive Queue
1369  * @r_xprt: controlling transport instance
1370  * @needed: current credit grant
1371  * @temp: mark Receive buffers to be deleted after one use
1372  *
1373  */
1374 void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp)
1375 {
1376         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1377         struct rpcrdma_ep *ep = r_xprt->rx_ep;
1378         struct ib_recv_wr *wr, *bad_wr;
1379         struct rpcrdma_rep *rep;
1380         int count, rc;
1381
1382         rc = 0;
1383         count = 0;
1384
1385         if (likely(ep->re_receive_count > needed))
1386                 goto out;
1387         needed -= ep->re_receive_count;
1388         if (!temp)
1389                 needed += RPCRDMA_MAX_RECV_BATCH;
1390
1391         if (atomic_inc_return(&ep->re_receiving) > 1)
1392                 goto out;
1393
1394         /* fast path: all needed reps can be found on the free list */
1395         wr = NULL;
1396         while (needed) {
1397                 rep = rpcrdma_rep_get_locked(buf);
1398                 if (rep && rep->rr_temp) {
1399                         rpcrdma_rep_destroy(rep);
1400                         continue;
1401                 }
1402                 if (!rep)
1403                         rep = rpcrdma_rep_create(r_xprt, temp);
1404                 if (!rep)
1405                         break;
1406
1407                 rep->rr_cid.ci_queue_id = ep->re_attr.recv_cq->res.id;
1408                 trace_xprtrdma_post_recv(rep);
1409                 rep->rr_recv_wr.next = wr;
1410                 wr = &rep->rr_recv_wr;
1411                 --needed;
1412                 ++count;
1413         }
1414         if (!wr)
1415                 goto out;
1416
1417         rc = ib_post_recv(ep->re_id->qp, wr,
1418                           (const struct ib_recv_wr **)&bad_wr);
1419         if (atomic_dec_return(&ep->re_receiving) > 0)
1420                 complete(&ep->re_done);
1421
1422 out:
1423         trace_xprtrdma_post_recvs(r_xprt, count, rc);
1424         if (rc) {
1425                 for (wr = bad_wr; wr;) {
1426                         struct rpcrdma_rep *rep;
1427
1428                         rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
1429                         wr = wr->next;
1430                         rpcrdma_rep_put(buf, rep);
1431                         --count;
1432                 }
1433         }
1434         ep->re_receive_count += count;
1435         return;
1436 }