1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
4 (c) 2007 Network Appliance, Inc. All Rights Reserved.
5 (c) 2009 NetApp. All Rights Reserved.
8 ******************************************************************************/
10 #include <linux/tcp.h>
11 #include <linux/slab.h>
12 #include <linux/sunrpc/xprt.h>
13 #include <linux/export.h>
14 #include <linux/sunrpc/bc_xprt.h>
16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17 #define RPCDBG_FACILITY RPCDBG_TRANS
20 #define BC_MAX_SLOTS 64U
22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
28 * Helper routines that track the number of preallocation elements
31 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
33 return xprt->bc_alloc_count < xprt->bc_alloc_max;
37 * Free the preallocated rpc_rqst structure and the memory
38 * buffers hanging off of it.
40 static void xprt_free_allocation(struct rpc_rqst *req)
42 struct xdr_buf *xbufp;
44 dprintk("RPC: free allocations for req= %p\n", req);
45 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
46 xbufp = &req->rq_rcv_buf;
47 free_page((unsigned long)xbufp->head[0].iov_base);
48 xbufp = &req->rq_snd_buf;
49 free_page((unsigned long)xbufp->head[0].iov_base);
53 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
56 /* Preallocate one XDR receive buffer */
57 page = alloc_page(gfp_flags);
60 xdr_buf_init(buf, page_address(page), PAGE_SIZE);
64 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
66 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
69 /* Pre-allocate one backchannel rpc_rqst */
70 req = kzalloc(sizeof(*req), gfp_flags);
75 INIT_LIST_HEAD(&req->rq_bc_list);
77 /* Preallocate one XDR receive buffer */
78 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
79 printk(KERN_ERR "Failed to create bc receive xbuf\n");
82 req->rq_rcv_buf.len = PAGE_SIZE;
84 /* Preallocate one XDR send buffer */
85 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
86 printk(KERN_ERR "Failed to create bc snd xbuf\n");
91 xprt_free_allocation(req);
96 * Preallocate up to min_reqs structures and related buffers for use
97 * by the backchannel. This function can be called multiple times
98 * when creating new sessions that use the same rpc_xprt. The
99 * preallocated buffers are added to the pool of resources used by
100 * the rpc_xprt. Any one of these resources may be used by an
101 * incoming callback request. It's up to the higher levels in the
102 * stack to enforce that the maximum number of session slots is not
105 * Some callback arguments can be large. For example, a pNFS server
106 * using multiple deviceids. The list can be unbound, but the client
107 * has the ability to tell the server the maximum size of the callback
108 * requests. Each deviceID is 16 bytes, so allocate one page
109 * for the arguments to have enough room to receive a number of these
110 * deviceIDs. The NFS client indicates to the pNFS server that its
111 * callback requests can be up to 4096 bytes in size.
113 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
115 if (!xprt->ops->bc_setup)
117 return xprt->ops->bc_setup(xprt, min_reqs);
119 EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
121 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
123 struct rpc_rqst *req;
124 struct list_head tmp_list;
127 dprintk("RPC: setup backchannel transport\n");
129 if (min_reqs > BC_MAX_SLOTS)
130 min_reqs = BC_MAX_SLOTS;
133 * We use a temporary list to keep track of the preallocated
134 * buffers. Once we're done building the list we splice it
135 * into the backchannel preallocation list off of the rpc_xprt
136 * struct. This helps minimize the amount of time the list
137 * lock is held on the rpc_xprt struct. It also makes cleanup
138 * easier in case of memory allocation errors.
140 INIT_LIST_HEAD(&tmp_list);
141 for (i = 0; i < min_reqs; i++) {
142 /* Pre-allocate one backchannel rpc_rqst */
143 req = xprt_alloc_bc_req(xprt);
145 printk(KERN_ERR "Failed to create bc rpc_rqst\n");
149 /* Add the allocated buffer to the tmp list */
150 dprintk("RPC: adding req= %p\n", req);
151 list_add(&req->rq_bc_pa_list, &tmp_list);
155 * Add the temporary list to the backchannel preallocation list
157 spin_lock(&xprt->bc_pa_lock);
158 list_splice(&tmp_list, &xprt->bc_pa_list);
159 xprt->bc_alloc_count += min_reqs;
160 xprt->bc_alloc_max += min_reqs;
161 atomic_add(min_reqs, &xprt->bc_slot_count);
162 spin_unlock(&xprt->bc_pa_lock);
164 dprintk("RPC: setup backchannel transport done\n");
169 * Memory allocation failed, free the temporary list
171 while (!list_empty(&tmp_list)) {
172 req = list_first_entry(&tmp_list,
175 list_del(&req->rq_bc_pa_list);
176 xprt_free_allocation(req);
179 dprintk("RPC: setup backchannel transport failed\n");
184 * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
185 * @xprt: the transport holding the preallocated strucures
186 * @max_reqs: the maximum number of preallocated structures to destroy
188 * Since these structures may have been allocated by multiple calls
189 * to xprt_setup_backchannel, we only destroy up to the maximum number
190 * of reqs specified by the caller.
192 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
194 if (xprt->ops->bc_destroy)
195 xprt->ops->bc_destroy(xprt, max_reqs);
197 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
199 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
201 struct rpc_rqst *req = NULL, *tmp = NULL;
203 dprintk("RPC: destroy backchannel transport\n");
208 spin_lock_bh(&xprt->bc_pa_lock);
209 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
210 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
211 dprintk("RPC: req=%p\n", req);
212 list_del(&req->rq_bc_pa_list);
213 xprt_free_allocation(req);
214 xprt->bc_alloc_count--;
215 atomic_dec(&xprt->bc_slot_count);
219 spin_unlock_bh(&xprt->bc_pa_lock);
222 dprintk("RPC: backchannel list empty= %s\n",
223 list_empty(&xprt->bc_pa_list) ? "true" : "false");
226 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
227 struct rpc_rqst *new)
229 struct rpc_rqst *req = NULL;
231 dprintk("RPC: allocate a backchannel request\n");
232 if (list_empty(&xprt->bc_pa_list)) {
235 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
237 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
238 xprt->bc_alloc_count++;
239 atomic_inc(&xprt->bc_slot_count);
241 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
243 req->rq_reply_bytes_recvd = 0;
244 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
245 sizeof(req->rq_private_buf));
247 req->rq_connect_cookie = xprt->connect_cookie;
248 dprintk("RPC: backchannel req=%p\n", req);
254 * Return the preallocated rpc_rqst structure and XDR buffers
255 * associated with this rpc_task.
257 void xprt_free_bc_request(struct rpc_rqst *req)
259 struct rpc_xprt *xprt = req->rq_xprt;
261 xprt->ops->bc_free_rqst(req);
264 void xprt_free_bc_rqst(struct rpc_rqst *req)
266 struct rpc_xprt *xprt = req->rq_xprt;
268 dprintk("RPC: free backchannel req=%p\n", req);
270 req->rq_connect_cookie = xprt->connect_cookie - 1;
271 smp_mb__before_atomic();
272 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
273 smp_mb__after_atomic();
276 * Return it to the list of preallocations so that it
277 * may be reused by a new callback request.
279 spin_lock_bh(&xprt->bc_pa_lock);
280 if (xprt_need_to_requeue(xprt)) {
281 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
282 xprt->bc_alloc_count++;
283 atomic_inc(&xprt->bc_slot_count);
286 spin_unlock_bh(&xprt->bc_pa_lock);
289 * The last remaining session was destroyed while this
290 * entry was in use. Free the entry and don't attempt
291 * to add back to the list because there is no need to
292 * have anymore preallocated entries.
294 dprintk("RPC: Last session removed req=%p\n", req);
295 xprt_free_allocation(req);
301 * One or more rpc_rqst structure have been preallocated during the
302 * backchannel setup. Buffer space for the send and private XDR buffers
303 * has been preallocated as well. Use xprt_alloc_bc_request to allocate
304 * to this request. Use xprt_free_bc_request to return it.
306 * We know that we're called in soft interrupt context, grab the spin_lock
307 * since there is no need to grab the bottom half spin_lock.
309 * Return an available rpc_rqst, otherwise NULL if non are available.
311 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
313 struct rpc_rqst *req, *new = NULL;
316 spin_lock(&xprt->bc_pa_lock);
317 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
318 if (req->rq_connect_cookie != xprt->connect_cookie)
320 if (req->rq_xid == xid)
323 req = xprt_get_bc_request(xprt, xid, new);
325 spin_unlock(&xprt->bc_pa_lock);
328 xprt_free_allocation(new);
332 new = xprt_alloc_bc_req(xprt);
338 * Add callback request to callback list. The callback
339 * service sleeps on the sv_cb_waitq waiting for new
340 * requests. Wake it up after adding enqueing the
343 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
345 struct rpc_xprt *xprt = req->rq_xprt;
346 struct svc_serv *bc_serv = xprt->bc_serv;
348 spin_lock(&xprt->bc_pa_lock);
349 list_del(&req->rq_bc_pa_list);
350 xprt->bc_alloc_count--;
351 spin_unlock(&xprt->bc_pa_lock);
353 req->rq_private_buf.len = copied;
354 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
356 dprintk("RPC: add callback request to list\n");
358 spin_lock(&bc_serv->sv_cb_lock);
359 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
360 wake_up(&bc_serv->sv_cb_waitq);
361 spin_unlock(&bc_serv->sv_cb_lock);