Merge tag 'xtensa-20210902' of git://github.com/jcmvbkbc/linux-xtensa
[linux-2.6-microblaze.git] / net / sunrpc / xprtrdma / svc_rdma_backchannel.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2015-2018 Oracle.  All rights reserved.
4  *
5  * Support for reverse-direction RPCs on RPC/RDMA (server-side).
6  */
7
8 #include <linux/sunrpc/svc_rdma.h>
9
10 #include "xprt_rdma.h"
11 #include <trace/events/rpcrdma.h>
12
13 /**
14  * svc_rdma_handle_bc_reply - Process incoming backchannel Reply
15  * @rqstp: resources for handling the Reply
16  * @rctxt: Received message
17  *
18  */
19 void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp,
20                               struct svc_rdma_recv_ctxt *rctxt)
21 {
22         struct svc_xprt *sxprt = rqstp->rq_xprt;
23         struct rpc_xprt *xprt = sxprt->xpt_bc_xprt;
24         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
25         struct xdr_buf *rcvbuf = &rqstp->rq_arg;
26         struct kvec *dst, *src = &rcvbuf->head[0];
27         __be32 *rdma_resp = rctxt->rc_recv_buf;
28         struct rpc_rqst *req;
29         u32 credits;
30
31         spin_lock(&xprt->queue_lock);
32         req = xprt_lookup_rqst(xprt, *rdma_resp);
33         if (!req)
34                 goto out_unlock;
35
36         dst = &req->rq_private_buf.head[0];
37         memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
38         if (dst->iov_len < src->iov_len)
39                 goto out_unlock;
40         memcpy(dst->iov_base, src->iov_base, src->iov_len);
41         xprt_pin_rqst(req);
42         spin_unlock(&xprt->queue_lock);
43
44         credits = be32_to_cpup(rdma_resp + 2);
45         if (credits == 0)
46                 credits = 1;    /* don't deadlock */
47         else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
48                 credits = r_xprt->rx_buf.rb_bc_max_requests;
49         spin_lock(&xprt->transport_lock);
50         xprt->cwnd = credits << RPC_CWNDSHIFT;
51         spin_unlock(&xprt->transport_lock);
52
53         spin_lock(&xprt->queue_lock);
54         xprt_complete_rqst(req->rq_task, rcvbuf->len);
55         xprt_unpin_rqst(req);
56         rcvbuf->len = 0;
57
58 out_unlock:
59         spin_unlock(&xprt->queue_lock);
60 }
61
62 /* Send a reverse-direction RPC Call.
63  *
64  * Caller holds the connection's mutex and has already marshaled
65  * the RPC/RDMA request.
66  *
67  * This is similar to svc_rdma_send_reply_msg, but takes a struct
68  * rpc_rqst instead, does not support chunks, and avoids blocking
69  * memory allocation.
70  *
71  * XXX: There is still an opportunity to block in svc_rdma_send()
72  * if there are no SQ entries to post the Send. This may occur if
73  * the adapter has a small maximum SQ depth.
74  */
75 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
76                               struct rpc_rqst *rqst,
77                               struct svc_rdma_send_ctxt *sctxt)
78 {
79         struct svc_rdma_recv_ctxt *rctxt;
80         int ret;
81
82         rctxt = svc_rdma_recv_ctxt_get(rdma);
83         if (!rctxt)
84                 return -EIO;
85
86         ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf);
87         svc_rdma_recv_ctxt_put(rdma, rctxt);
88         if (ret < 0)
89                 return -EIO;
90
91         /* Bump page refcnt so Send completion doesn't release
92          * the rq_buffer before all retransmits are complete.
93          */
94         get_page(virt_to_page(rqst->rq_buffer));
95         sctxt->sc_send_wr.opcode = IB_WR_SEND;
96         ret = svc_rdma_send(rdma, sctxt);
97         if (ret < 0)
98                 return ret;
99
100         ret = wait_for_completion_killable(&sctxt->sc_done);
101         svc_rdma_send_ctxt_put(rdma, sctxt);
102         return ret;
103 }
104
105 /* Server-side transport endpoint wants a whole page for its send
106  * buffer. The client RPC code constructs the RPC header in this
107  * buffer before it invokes ->send_request.
108  */
109 static int
110 xprt_rdma_bc_allocate(struct rpc_task *task)
111 {
112         struct rpc_rqst *rqst = task->tk_rqstp;
113         size_t size = rqst->rq_callsize;
114         struct page *page;
115
116         if (size > PAGE_SIZE) {
117                 WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
118                           size);
119                 return -EINVAL;
120         }
121
122         page = alloc_page(RPCRDMA_DEF_GFP);
123         if (!page)
124                 return -ENOMEM;
125         rqst->rq_buffer = page_address(page);
126
127         rqst->rq_rbuffer = kmalloc(rqst->rq_rcvsize, RPCRDMA_DEF_GFP);
128         if (!rqst->rq_rbuffer) {
129                 put_page(page);
130                 return -ENOMEM;
131         }
132         return 0;
133 }
134
135 static void
136 xprt_rdma_bc_free(struct rpc_task *task)
137 {
138         struct rpc_rqst *rqst = task->tk_rqstp;
139
140         put_page(virt_to_page(rqst->rq_buffer));
141         kfree(rqst->rq_rbuffer);
142 }
143
144 static int
145 rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
146 {
147         struct rpc_xprt *xprt = rqst->rq_xprt;
148         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
149         struct svc_rdma_send_ctxt *ctxt;
150         __be32 *p;
151         int rc;
152
153         ctxt = svc_rdma_send_ctxt_get(rdma);
154         if (!ctxt)
155                 goto drop_connection;
156
157         p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_MIN);
158         if (!p)
159                 goto put_ctxt;
160         *p++ = rqst->rq_xid;
161         *p++ = rpcrdma_version;
162         *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
163         *p++ = rdma_msg;
164         *p++ = xdr_zero;
165         *p++ = xdr_zero;
166         *p   = xdr_zero;
167
168         rqst->rq_xtime = ktime_get();
169         rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
170         if (rc)
171                 goto put_ctxt;
172         return 0;
173
174 put_ctxt:
175         svc_rdma_send_ctxt_put(rdma, ctxt);
176
177 drop_connection:
178         return -ENOTCONN;
179 }
180
181 /**
182  * xprt_rdma_bc_send_request - Send a reverse-direction Call
183  * @rqst: rpc_rqst containing Call message to be sent
184  *
185  * Return values:
186  *   %0 if the message was sent successfully
187  *   %ENOTCONN if the message was not sent
188  */
189 static int xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
190 {
191         struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
192         struct svcxprt_rdma *rdma =
193                 container_of(sxprt, struct svcxprt_rdma, sc_xprt);
194         int ret;
195
196         if (test_bit(XPT_DEAD, &sxprt->xpt_flags))
197                 return -ENOTCONN;
198
199         ret = rpcrdma_bc_send_request(rdma, rqst);
200         if (ret == -ENOTCONN)
201                 svc_close_xprt(sxprt);
202         return ret;
203 }
204
205 static void
206 xprt_rdma_bc_close(struct rpc_xprt *xprt)
207 {
208         xprt_disconnect_done(xprt);
209         xprt->cwnd = RPC_CWNDSHIFT;
210 }
211
212 static void
213 xprt_rdma_bc_put(struct rpc_xprt *xprt)
214 {
215         xprt_rdma_free_addresses(xprt);
216         xprt_free(xprt);
217 }
218
219 static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
220         .reserve_xprt           = xprt_reserve_xprt_cong,
221         .release_xprt           = xprt_release_xprt_cong,
222         .alloc_slot             = xprt_alloc_slot,
223         .free_slot              = xprt_free_slot,
224         .release_request        = xprt_release_rqst_cong,
225         .buf_alloc              = xprt_rdma_bc_allocate,
226         .buf_free               = xprt_rdma_bc_free,
227         .send_request           = xprt_rdma_bc_send_request,
228         .wait_for_reply_request = xprt_wait_for_reply_request_def,
229         .close                  = xprt_rdma_bc_close,
230         .destroy                = xprt_rdma_bc_put,
231         .print_stats            = xprt_rdma_print_stats
232 };
233
234 static const struct rpc_timeout xprt_rdma_bc_timeout = {
235         .to_initval = 60 * HZ,
236         .to_maxval = 60 * HZ,
237 };
238
239 /* It shouldn't matter if the number of backchannel session slots
240  * doesn't match the number of RPC/RDMA credits. That just means
241  * one or the other will have extra slots that aren't used.
242  */
243 static struct rpc_xprt *
244 xprt_setup_rdma_bc(struct xprt_create *args)
245 {
246         struct rpc_xprt *xprt;
247         struct rpcrdma_xprt *new_xprt;
248
249         if (args->addrlen > sizeof(xprt->addr))
250                 return ERR_PTR(-EBADF);
251
252         xprt = xprt_alloc(args->net, sizeof(*new_xprt),
253                           RPCRDMA_MAX_BC_REQUESTS,
254                           RPCRDMA_MAX_BC_REQUESTS);
255         if (!xprt)
256                 return ERR_PTR(-ENOMEM);
257
258         xprt->timeout = &xprt_rdma_bc_timeout;
259         xprt_set_bound(xprt);
260         xprt_set_connected(xprt);
261         xprt->bind_timeout = 0;
262         xprt->reestablish_timeout = 0;
263         xprt->idle_timeout = 0;
264
265         xprt->prot = XPRT_TRANSPORT_BC_RDMA;
266         xprt->ops = &xprt_rdma_bc_procs;
267
268         memcpy(&xprt->addr, args->dstaddr, args->addrlen);
269         xprt->addrlen = args->addrlen;
270         xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
271         xprt->resvport = 0;
272
273         xprt->max_payload = xprt_rdma_max_inline_read;
274
275         new_xprt = rpcx_to_rdmax(xprt);
276         new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
277
278         xprt_get(xprt);
279         args->bc_xprt->xpt_bc_xprt = xprt;
280         xprt->bc_xprt = args->bc_xprt;
281
282         /* Final put for backchannel xprt is in __svc_rdma_free */
283         xprt_get(xprt);
284         return xprt;
285 }
286
287 struct xprt_class xprt_rdma_bc = {
288         .list                   = LIST_HEAD_INIT(xprt_rdma_bc.list),
289         .name                   = "rdma backchannel",
290         .owner                  = THIS_MODULE,
291         .ident                  = XPRT_TRANSPORT_BC_RDMA,
292         .setup                  = xprt_setup_rdma_bc,
293 };