Merge tag 'xtensa-20210902' of git://github.com/jcmvbkbc/linux-xtensa
[linux-2.6-microblaze.git] / net / sunrpc / clnt.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  *  linux/net/sunrpc/clnt.c
4  *
5  *  This file contains the high-level RPC interface.
6  *  It is modeled as a finite state machine to support both synchronous
7  *  and asynchronous requests.
8  *
9  *  -   RPC header generation and argument serialization.
10  *  -   Credential refresh.
11  *  -   TCP connect handling.
12  *  -   Retry of operation when it is suspected the operation failed because
13  *      of uid squashing on the server, or when the credentials were stale
14  *      and need to be refreshed, or when a packet was damaged in transit.
15  *      This may be have to be moved to the VFS layer.
16  *
17  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19  */
20
21
22 #include <linux/module.h>
23 #include <linux/types.h>
24 #include <linux/kallsyms.h>
25 #include <linux/mm.h>
26 #include <linux/namei.h>
27 #include <linux/mount.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32 #include <linux/in.h>
33 #include <linux/in6.h>
34 #include <linux/un.h>
35
36 #include <linux/sunrpc/clnt.h>
37 #include <linux/sunrpc/addr.h>
38 #include <linux/sunrpc/rpc_pipe_fs.h>
39 #include <linux/sunrpc/metrics.h>
40 #include <linux/sunrpc/bc_xprt.h>
41 #include <trace/events/sunrpc.h>
42
43 #include "sunrpc.h"
44 #include "sysfs.h"
45 #include "netns.h"
46
47 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48 # define RPCDBG_FACILITY        RPCDBG_CALL
49 #endif
50
51 /*
52  * All RPC clients are linked into this list
53  */
54
55 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
56
57
58 static void     call_start(struct rpc_task *task);
59 static void     call_reserve(struct rpc_task *task);
60 static void     call_reserveresult(struct rpc_task *task);
61 static void     call_allocate(struct rpc_task *task);
62 static void     call_encode(struct rpc_task *task);
63 static void     call_decode(struct rpc_task *task);
64 static void     call_bind(struct rpc_task *task);
65 static void     call_bind_status(struct rpc_task *task);
66 static void     call_transmit(struct rpc_task *task);
67 static void     call_status(struct rpc_task *task);
68 static void     call_transmit_status(struct rpc_task *task);
69 static void     call_refresh(struct rpc_task *task);
70 static void     call_refreshresult(struct rpc_task *task);
71 static void     call_connect(struct rpc_task *task);
72 static void     call_connect_status(struct rpc_task *task);
73
74 static int      rpc_encode_header(struct rpc_task *task,
75                                   struct xdr_stream *xdr);
76 static int      rpc_decode_header(struct rpc_task *task,
77                                   struct xdr_stream *xdr);
78 static int      rpc_ping(struct rpc_clnt *clnt);
79 static void     rpc_check_timeout(struct rpc_task *task);
80
81 static void rpc_register_client(struct rpc_clnt *clnt)
82 {
83         struct net *net = rpc_net_ns(clnt);
84         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
85
86         spin_lock(&sn->rpc_client_lock);
87         list_add(&clnt->cl_clients, &sn->all_clients);
88         spin_unlock(&sn->rpc_client_lock);
89 }
90
91 static void rpc_unregister_client(struct rpc_clnt *clnt)
92 {
93         struct net *net = rpc_net_ns(clnt);
94         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
95
96         spin_lock(&sn->rpc_client_lock);
97         list_del(&clnt->cl_clients);
98         spin_unlock(&sn->rpc_client_lock);
99 }
100
101 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
102 {
103         rpc_remove_client_dir(clnt);
104 }
105
106 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
107 {
108         struct net *net = rpc_net_ns(clnt);
109         struct super_block *pipefs_sb;
110
111         pipefs_sb = rpc_get_sb_net(net);
112         if (pipefs_sb) {
113                 __rpc_clnt_remove_pipedir(clnt);
114                 rpc_put_sb_net(net);
115         }
116 }
117
118 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
119                                     struct rpc_clnt *clnt)
120 {
121         static uint32_t clntid;
122         const char *dir_name = clnt->cl_program->pipe_dir_name;
123         char name[15];
124         struct dentry *dir, *dentry;
125
126         dir = rpc_d_lookup_sb(sb, dir_name);
127         if (dir == NULL) {
128                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
129                 return dir;
130         }
131         for (;;) {
132                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
133                 name[sizeof(name) - 1] = '\0';
134                 dentry = rpc_create_client_dir(dir, name, clnt);
135                 if (!IS_ERR(dentry))
136                         break;
137                 if (dentry == ERR_PTR(-EEXIST))
138                         continue;
139                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
140                                 " %s/%s, error %ld\n",
141                                 dir_name, name, PTR_ERR(dentry));
142                 break;
143         }
144         dput(dir);
145         return dentry;
146 }
147
148 static int
149 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
150 {
151         struct dentry *dentry;
152
153         if (clnt->cl_program->pipe_dir_name != NULL) {
154                 dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
155                 if (IS_ERR(dentry))
156                         return PTR_ERR(dentry);
157         }
158         return 0;
159 }
160
161 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
162 {
163         if (clnt->cl_program->pipe_dir_name == NULL)
164                 return 1;
165
166         switch (event) {
167         case RPC_PIPEFS_MOUNT:
168                 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
169                         return 1;
170                 if (atomic_read(&clnt->cl_count) == 0)
171                         return 1;
172                 break;
173         case RPC_PIPEFS_UMOUNT:
174                 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
175                         return 1;
176                 break;
177         }
178         return 0;
179 }
180
181 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
182                                    struct super_block *sb)
183 {
184         struct dentry *dentry;
185
186         switch (event) {
187         case RPC_PIPEFS_MOUNT:
188                 dentry = rpc_setup_pipedir_sb(sb, clnt);
189                 if (!dentry)
190                         return -ENOENT;
191                 if (IS_ERR(dentry))
192                         return PTR_ERR(dentry);
193                 break;
194         case RPC_PIPEFS_UMOUNT:
195                 __rpc_clnt_remove_pipedir(clnt);
196                 break;
197         default:
198                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
199                 return -ENOTSUPP;
200         }
201         return 0;
202 }
203
204 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
205                                 struct super_block *sb)
206 {
207         int error = 0;
208
209         for (;; clnt = clnt->cl_parent) {
210                 if (!rpc_clnt_skip_event(clnt, event))
211                         error = __rpc_clnt_handle_event(clnt, event, sb);
212                 if (error || clnt == clnt->cl_parent)
213                         break;
214         }
215         return error;
216 }
217
218 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
219 {
220         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
221         struct rpc_clnt *clnt;
222
223         spin_lock(&sn->rpc_client_lock);
224         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
225                 if (rpc_clnt_skip_event(clnt, event))
226                         continue;
227                 spin_unlock(&sn->rpc_client_lock);
228                 return clnt;
229         }
230         spin_unlock(&sn->rpc_client_lock);
231         return NULL;
232 }
233
234 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
235                             void *ptr)
236 {
237         struct super_block *sb = ptr;
238         struct rpc_clnt *clnt;
239         int error = 0;
240
241         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
242                 error = __rpc_pipefs_event(clnt, event, sb);
243                 if (error)
244                         break;
245         }
246         return error;
247 }
248
249 static struct notifier_block rpc_clients_block = {
250         .notifier_call  = rpc_pipefs_event,
251         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
252 };
253
254 int rpc_clients_notifier_register(void)
255 {
256         return rpc_pipefs_notifier_register(&rpc_clients_block);
257 }
258
259 void rpc_clients_notifier_unregister(void)
260 {
261         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
262 }
263
264 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
265                 struct rpc_xprt *xprt,
266                 const struct rpc_timeout *timeout)
267 {
268         struct rpc_xprt *old;
269
270         spin_lock(&clnt->cl_lock);
271         old = rcu_dereference_protected(clnt->cl_xprt,
272                         lockdep_is_held(&clnt->cl_lock));
273
274         if (!xprt_bound(xprt))
275                 clnt->cl_autobind = 1;
276
277         clnt->cl_timeout = timeout;
278         rcu_assign_pointer(clnt->cl_xprt, xprt);
279         spin_unlock(&clnt->cl_lock);
280
281         return old;
282 }
283
284 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
285 {
286         clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
287                         nodename, sizeof(clnt->cl_nodename));
288 }
289
290 static int rpc_client_register(struct rpc_clnt *clnt,
291                                rpc_authflavor_t pseudoflavor,
292                                const char *client_name)
293 {
294         struct rpc_auth_create_args auth_args = {
295                 .pseudoflavor = pseudoflavor,
296                 .target_name = client_name,
297         };
298         struct rpc_auth *auth;
299         struct net *net = rpc_net_ns(clnt);
300         struct super_block *pipefs_sb;
301         int err;
302
303         rpc_clnt_debugfs_register(clnt);
304
305         pipefs_sb = rpc_get_sb_net(net);
306         if (pipefs_sb) {
307                 err = rpc_setup_pipedir(pipefs_sb, clnt);
308                 if (err)
309                         goto out;
310         }
311
312         rpc_register_client(clnt);
313         if (pipefs_sb)
314                 rpc_put_sb_net(net);
315
316         auth = rpcauth_create(&auth_args, clnt);
317         if (IS_ERR(auth)) {
318                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
319                                 pseudoflavor);
320                 err = PTR_ERR(auth);
321                 goto err_auth;
322         }
323         return 0;
324 err_auth:
325         pipefs_sb = rpc_get_sb_net(net);
326         rpc_unregister_client(clnt);
327         __rpc_clnt_remove_pipedir(clnt);
328 out:
329         if (pipefs_sb)
330                 rpc_put_sb_net(net);
331         rpc_sysfs_client_destroy(clnt);
332         rpc_clnt_debugfs_unregister(clnt);
333         return err;
334 }
335
336 static DEFINE_IDA(rpc_clids);
337
338 void rpc_cleanup_clids(void)
339 {
340         ida_destroy(&rpc_clids);
341 }
342
343 static int rpc_alloc_clid(struct rpc_clnt *clnt)
344 {
345         int clid;
346
347         clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
348         if (clid < 0)
349                 return clid;
350         clnt->cl_clid = clid;
351         return 0;
352 }
353
354 static void rpc_free_clid(struct rpc_clnt *clnt)
355 {
356         ida_simple_remove(&rpc_clids, clnt->cl_clid);
357 }
358
359 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
360                 struct rpc_xprt_switch *xps,
361                 struct rpc_xprt *xprt,
362                 struct rpc_clnt *parent)
363 {
364         const struct rpc_program *program = args->program;
365         const struct rpc_version *version;
366         struct rpc_clnt *clnt = NULL;
367         const struct rpc_timeout *timeout;
368         const char *nodename = args->nodename;
369         int err;
370
371         err = rpciod_up();
372         if (err)
373                 goto out_no_rpciod;
374
375         err = -EINVAL;
376         if (args->version >= program->nrvers)
377                 goto out_err;
378         version = program->version[args->version];
379         if (version == NULL)
380                 goto out_err;
381
382         err = -ENOMEM;
383         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
384         if (!clnt)
385                 goto out_err;
386         clnt->cl_parent = parent ? : clnt;
387
388         err = rpc_alloc_clid(clnt);
389         if (err)
390                 goto out_no_clid;
391
392         clnt->cl_cred     = get_cred(args->cred);
393         clnt->cl_procinfo = version->procs;
394         clnt->cl_maxproc  = version->nrprocs;
395         clnt->cl_prog     = args->prognumber ? : program->number;
396         clnt->cl_vers     = version->number;
397         clnt->cl_stats    = program->stats;
398         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
399         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
400         err = -ENOMEM;
401         if (clnt->cl_metrics == NULL)
402                 goto out_no_stats;
403         clnt->cl_program  = program;
404         INIT_LIST_HEAD(&clnt->cl_tasks);
405         spin_lock_init(&clnt->cl_lock);
406
407         timeout = xprt->timeout;
408         if (args->timeout != NULL) {
409                 memcpy(&clnt->cl_timeout_default, args->timeout,
410                                 sizeof(clnt->cl_timeout_default));
411                 timeout = &clnt->cl_timeout_default;
412         }
413
414         rpc_clnt_set_transport(clnt, xprt, timeout);
415         xprt->main = true;
416         xprt_iter_init(&clnt->cl_xpi, xps);
417         xprt_switch_put(xps);
418
419         clnt->cl_rtt = &clnt->cl_rtt_default;
420         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
421
422         atomic_set(&clnt->cl_count, 1);
423
424         if (nodename == NULL)
425                 nodename = utsname()->nodename;
426         /* save the nodename */
427         rpc_clnt_set_nodename(clnt, nodename);
428
429         rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
430         err = rpc_client_register(clnt, args->authflavor, args->client_name);
431         if (err)
432                 goto out_no_path;
433         if (parent)
434                 atomic_inc(&parent->cl_count);
435
436         trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
437         return clnt;
438
439 out_no_path:
440         rpc_free_iostats(clnt->cl_metrics);
441 out_no_stats:
442         put_cred(clnt->cl_cred);
443         rpc_free_clid(clnt);
444 out_no_clid:
445         kfree(clnt);
446 out_err:
447         rpciod_down();
448 out_no_rpciod:
449         xprt_switch_put(xps);
450         xprt_put(xprt);
451         trace_rpc_clnt_new_err(program->name, args->servername, err);
452         return ERR_PTR(err);
453 }
454
455 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
456                                         struct rpc_xprt *xprt)
457 {
458         struct rpc_clnt *clnt = NULL;
459         struct rpc_xprt_switch *xps;
460
461         if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
462                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
463                 xps = args->bc_xprt->xpt_bc_xps;
464                 xprt_switch_get(xps);
465         } else {
466                 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
467                 if (xps == NULL) {
468                         xprt_put(xprt);
469                         return ERR_PTR(-ENOMEM);
470                 }
471                 if (xprt->bc_xprt) {
472                         xprt_switch_get(xps);
473                         xprt->bc_xprt->xpt_bc_xps = xps;
474                 }
475         }
476         clnt = rpc_new_client(args, xps, xprt, NULL);
477         if (IS_ERR(clnt))
478                 return clnt;
479
480         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
481                 int err = rpc_ping(clnt);
482                 if (err != 0) {
483                         rpc_shutdown_client(clnt);
484                         return ERR_PTR(err);
485                 }
486         }
487
488         clnt->cl_softrtry = 1;
489         if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
490                 clnt->cl_softrtry = 0;
491                 if (args->flags & RPC_CLNT_CREATE_SOFTERR)
492                         clnt->cl_softerr = 1;
493         }
494
495         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
496                 clnt->cl_autobind = 1;
497         if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
498                 clnt->cl_noretranstimeo = 1;
499         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
500                 clnt->cl_discrtry = 1;
501         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
502                 clnt->cl_chatty = 1;
503
504         return clnt;
505 }
506
507 /**
508  * rpc_create - create an RPC client and transport with one call
509  * @args: rpc_clnt create argument structure
510  *
511  * Creates and initializes an RPC transport and an RPC client.
512  *
513  * It can ping the server in order to determine if it is up, and to see if
514  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
515  * this behavior so asynchronous tasks can also use rpc_create.
516  */
517 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
518 {
519         struct rpc_xprt *xprt;
520         struct xprt_create xprtargs = {
521                 .net = args->net,
522                 .ident = args->protocol,
523                 .srcaddr = args->saddress,
524                 .dstaddr = args->address,
525                 .addrlen = args->addrsize,
526                 .servername = args->servername,
527                 .bc_xprt = args->bc_xprt,
528         };
529         char servername[48];
530         struct rpc_clnt *clnt;
531         int i;
532
533         if (args->bc_xprt) {
534                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
535                 xprt = args->bc_xprt->xpt_bc_xprt;
536                 if (xprt) {
537                         xprt_get(xprt);
538                         return rpc_create_xprt(args, xprt);
539                 }
540         }
541
542         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
543                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
544         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
545                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
546         /*
547          * If the caller chooses not to specify a hostname, whip
548          * up a string representation of the passed-in address.
549          */
550         if (xprtargs.servername == NULL) {
551                 struct sockaddr_un *sun =
552                                 (struct sockaddr_un *)args->address;
553                 struct sockaddr_in *sin =
554                                 (struct sockaddr_in *)args->address;
555                 struct sockaddr_in6 *sin6 =
556                                 (struct sockaddr_in6 *)args->address;
557
558                 servername[0] = '\0';
559                 switch (args->address->sa_family) {
560                 case AF_LOCAL:
561                         snprintf(servername, sizeof(servername), "%s",
562                                  sun->sun_path);
563                         break;
564                 case AF_INET:
565                         snprintf(servername, sizeof(servername), "%pI4",
566                                  &sin->sin_addr.s_addr);
567                         break;
568                 case AF_INET6:
569                         snprintf(servername, sizeof(servername), "%pI6",
570                                  &sin6->sin6_addr);
571                         break;
572                 default:
573                         /* caller wants default server name, but
574                          * address family isn't recognized. */
575                         return ERR_PTR(-EINVAL);
576                 }
577                 xprtargs.servername = servername;
578         }
579
580         xprt = xprt_create_transport(&xprtargs);
581         if (IS_ERR(xprt))
582                 return (struct rpc_clnt *)xprt;
583
584         /*
585          * By default, kernel RPC client connects from a reserved port.
586          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
587          * but it is always enabled for rpciod, which handles the connect
588          * operation.
589          */
590         xprt->resvport = 1;
591         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
592                 xprt->resvport = 0;
593         xprt->reuseport = 0;
594         if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
595                 xprt->reuseport = 1;
596
597         clnt = rpc_create_xprt(args, xprt);
598         if (IS_ERR(clnt) || args->nconnect <= 1)
599                 return clnt;
600
601         for (i = 0; i < args->nconnect - 1; i++) {
602                 if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
603                         break;
604         }
605         return clnt;
606 }
607 EXPORT_SYMBOL_GPL(rpc_create);
608
609 /*
610  * This function clones the RPC client structure. It allows us to share the
611  * same transport while varying parameters such as the authentication
612  * flavour.
613  */
614 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
615                                            struct rpc_clnt *clnt)
616 {
617         struct rpc_xprt_switch *xps;
618         struct rpc_xprt *xprt;
619         struct rpc_clnt *new;
620         int err;
621
622         err = -ENOMEM;
623         rcu_read_lock();
624         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
625         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
626         rcu_read_unlock();
627         if (xprt == NULL || xps == NULL) {
628                 xprt_put(xprt);
629                 xprt_switch_put(xps);
630                 goto out_err;
631         }
632         args->servername = xprt->servername;
633         args->nodename = clnt->cl_nodename;
634
635         new = rpc_new_client(args, xps, xprt, clnt);
636         if (IS_ERR(new))
637                 return new;
638
639         /* Turn off autobind on clones */
640         new->cl_autobind = 0;
641         new->cl_softrtry = clnt->cl_softrtry;
642         new->cl_softerr = clnt->cl_softerr;
643         new->cl_noretranstimeo = clnt->cl_noretranstimeo;
644         new->cl_discrtry = clnt->cl_discrtry;
645         new->cl_chatty = clnt->cl_chatty;
646         new->cl_principal = clnt->cl_principal;
647         return new;
648
649 out_err:
650         trace_rpc_clnt_clone_err(clnt, err);
651         return ERR_PTR(err);
652 }
653
654 /**
655  * rpc_clone_client - Clone an RPC client structure
656  *
657  * @clnt: RPC client whose parameters are copied
658  *
659  * Returns a fresh RPC client or an ERR_PTR.
660  */
661 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
662 {
663         struct rpc_create_args args = {
664                 .program        = clnt->cl_program,
665                 .prognumber     = clnt->cl_prog,
666                 .version        = clnt->cl_vers,
667                 .authflavor     = clnt->cl_auth->au_flavor,
668                 .cred           = clnt->cl_cred,
669         };
670         return __rpc_clone_client(&args, clnt);
671 }
672 EXPORT_SYMBOL_GPL(rpc_clone_client);
673
674 /**
675  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
676  *
677  * @clnt: RPC client whose parameters are copied
678  * @flavor: security flavor for new client
679  *
680  * Returns a fresh RPC client or an ERR_PTR.
681  */
682 struct rpc_clnt *
683 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
684 {
685         struct rpc_create_args args = {
686                 .program        = clnt->cl_program,
687                 .prognumber     = clnt->cl_prog,
688                 .version        = clnt->cl_vers,
689                 .authflavor     = flavor,
690                 .cred           = clnt->cl_cred,
691         };
692         return __rpc_clone_client(&args, clnt);
693 }
694 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
695
696 /**
697  * rpc_switch_client_transport: switch the RPC transport on the fly
698  * @clnt: pointer to a struct rpc_clnt
699  * @args: pointer to the new transport arguments
700  * @timeout: pointer to the new timeout parameters
701  *
702  * This function allows the caller to switch the RPC transport for the
703  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
704  * server, for instance.  It assumes that the caller has ensured that
705  * there are no active RPC tasks by using some form of locking.
706  *
707  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
708  * negative errno is returned, and "clnt" continues to use the old
709  * xprt.
710  */
711 int rpc_switch_client_transport(struct rpc_clnt *clnt,
712                 struct xprt_create *args,
713                 const struct rpc_timeout *timeout)
714 {
715         const struct rpc_timeout *old_timeo;
716         rpc_authflavor_t pseudoflavor;
717         struct rpc_xprt_switch *xps, *oldxps;
718         struct rpc_xprt *xprt, *old;
719         struct rpc_clnt *parent;
720         int err;
721
722         xprt = xprt_create_transport(args);
723         if (IS_ERR(xprt))
724                 return PTR_ERR(xprt);
725
726         xps = xprt_switch_alloc(xprt, GFP_KERNEL);
727         if (xps == NULL) {
728                 xprt_put(xprt);
729                 return -ENOMEM;
730         }
731
732         pseudoflavor = clnt->cl_auth->au_flavor;
733
734         old_timeo = clnt->cl_timeout;
735         old = rpc_clnt_set_transport(clnt, xprt, timeout);
736         oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
737
738         rpc_unregister_client(clnt);
739         __rpc_clnt_remove_pipedir(clnt);
740         rpc_sysfs_client_destroy(clnt);
741         rpc_clnt_debugfs_unregister(clnt);
742
743         /*
744          * A new transport was created.  "clnt" therefore
745          * becomes the root of a new cl_parent tree.  clnt's
746          * children, if it has any, still point to the old xprt.
747          */
748         parent = clnt->cl_parent;
749         clnt->cl_parent = clnt;
750
751         /*
752          * The old rpc_auth cache cannot be re-used.  GSS
753          * contexts in particular are between a single
754          * client and server.
755          */
756         err = rpc_client_register(clnt, pseudoflavor, NULL);
757         if (err)
758                 goto out_revert;
759
760         synchronize_rcu();
761         if (parent != clnt)
762                 rpc_release_client(parent);
763         xprt_switch_put(oldxps);
764         xprt_put(old);
765         trace_rpc_clnt_replace_xprt(clnt);
766         return 0;
767
768 out_revert:
769         xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
770         rpc_clnt_set_transport(clnt, old, old_timeo);
771         clnt->cl_parent = parent;
772         rpc_client_register(clnt, pseudoflavor, NULL);
773         xprt_switch_put(xps);
774         xprt_put(xprt);
775         trace_rpc_clnt_replace_xprt_err(clnt);
776         return err;
777 }
778 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
779
780 static
781 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
782 {
783         struct rpc_xprt_switch *xps;
784
785         rcu_read_lock();
786         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
787         rcu_read_unlock();
788         if (xps == NULL)
789                 return -EAGAIN;
790         xprt_iter_init_listall(xpi, xps);
791         xprt_switch_put(xps);
792         return 0;
793 }
794
795 /**
796  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
797  * @clnt: pointer to client
798  * @fn: function to apply
799  * @data: void pointer to function data
800  *
801  * Iterates through the list of RPC transports currently attached to the
802  * client and applies the function fn(clnt, xprt, data).
803  *
804  * On error, the iteration stops, and the function returns the error value.
805  */
806 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
807                 int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
808                 void *data)
809 {
810         struct rpc_xprt_iter xpi;
811         int ret;
812
813         ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
814         if (ret)
815                 return ret;
816         for (;;) {
817                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
818
819                 if (!xprt)
820                         break;
821                 ret = fn(clnt, xprt, data);
822                 xprt_put(xprt);
823                 if (ret < 0)
824                         break;
825         }
826         xprt_iter_destroy(&xpi);
827         return ret;
828 }
829 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
830
831 /*
832  * Kill all tasks for the given client.
833  * XXX: kill their descendants as well?
834  */
835 void rpc_killall_tasks(struct rpc_clnt *clnt)
836 {
837         struct rpc_task *rovr;
838
839
840         if (list_empty(&clnt->cl_tasks))
841                 return;
842
843         /*
844          * Spin lock all_tasks to prevent changes...
845          */
846         trace_rpc_clnt_killall(clnt);
847         spin_lock(&clnt->cl_lock);
848         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
849                 rpc_signal_task(rovr);
850         spin_unlock(&clnt->cl_lock);
851 }
852 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
853
854 /*
855  * Properly shut down an RPC client, terminating all outstanding
856  * requests.
857  */
858 void rpc_shutdown_client(struct rpc_clnt *clnt)
859 {
860         might_sleep();
861
862         trace_rpc_clnt_shutdown(clnt);
863
864         while (!list_empty(&clnt->cl_tasks)) {
865                 rpc_killall_tasks(clnt);
866                 wait_event_timeout(destroy_wait,
867                         list_empty(&clnt->cl_tasks), 1*HZ);
868         }
869
870         rpc_release_client(clnt);
871 }
872 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
873
874 /*
875  * Free an RPC client
876  */
877 static void rpc_free_client_work(struct work_struct *work)
878 {
879         struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
880
881         trace_rpc_clnt_free(clnt);
882
883         /* These might block on processes that might allocate memory,
884          * so they cannot be called in rpciod, so they are handled separately
885          * here.
886          */
887         rpc_sysfs_client_destroy(clnt);
888         rpc_clnt_debugfs_unregister(clnt);
889         rpc_free_clid(clnt);
890         rpc_clnt_remove_pipedir(clnt);
891         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
892
893         kfree(clnt);
894         rpciod_down();
895 }
896 static struct rpc_clnt *
897 rpc_free_client(struct rpc_clnt *clnt)
898 {
899         struct rpc_clnt *parent = NULL;
900
901         trace_rpc_clnt_release(clnt);
902         if (clnt->cl_parent != clnt)
903                 parent = clnt->cl_parent;
904         rpc_unregister_client(clnt);
905         rpc_free_iostats(clnt->cl_metrics);
906         clnt->cl_metrics = NULL;
907         xprt_iter_destroy(&clnt->cl_xpi);
908         put_cred(clnt->cl_cred);
909
910         INIT_WORK(&clnt->cl_work, rpc_free_client_work);
911         schedule_work(&clnt->cl_work);
912         return parent;
913 }
914
915 /*
916  * Free an RPC client
917  */
918 static struct rpc_clnt *
919 rpc_free_auth(struct rpc_clnt *clnt)
920 {
921         if (clnt->cl_auth == NULL)
922                 return rpc_free_client(clnt);
923
924         /*
925          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
926          *       release remaining GSS contexts. This mechanism ensures
927          *       that it can do so safely.
928          */
929         atomic_inc(&clnt->cl_count);
930         rpcauth_release(clnt->cl_auth);
931         clnt->cl_auth = NULL;
932         if (atomic_dec_and_test(&clnt->cl_count))
933                 return rpc_free_client(clnt);
934         return NULL;
935 }
936
937 /*
938  * Release reference to the RPC client
939  */
940 void
941 rpc_release_client(struct rpc_clnt *clnt)
942 {
943         do {
944                 if (list_empty(&clnt->cl_tasks))
945                         wake_up(&destroy_wait);
946                 if (!atomic_dec_and_test(&clnt->cl_count))
947                         break;
948                 clnt = rpc_free_auth(clnt);
949         } while (clnt != NULL);
950 }
951 EXPORT_SYMBOL_GPL(rpc_release_client);
952
953 /**
954  * rpc_bind_new_program - bind a new RPC program to an existing client
955  * @old: old rpc_client
956  * @program: rpc program to set
957  * @vers: rpc program version
958  *
959  * Clones the rpc client and sets up a new RPC program. This is mainly
960  * of use for enabling different RPC programs to share the same transport.
961  * The Sun NFSv2/v3 ACL protocol can do this.
962  */
963 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
964                                       const struct rpc_program *program,
965                                       u32 vers)
966 {
967         struct rpc_create_args args = {
968                 .program        = program,
969                 .prognumber     = program->number,
970                 .version        = vers,
971                 .authflavor     = old->cl_auth->au_flavor,
972                 .cred           = old->cl_cred,
973         };
974         struct rpc_clnt *clnt;
975         int err;
976
977         clnt = __rpc_clone_client(&args, old);
978         if (IS_ERR(clnt))
979                 goto out;
980         err = rpc_ping(clnt);
981         if (err != 0) {
982                 rpc_shutdown_client(clnt);
983                 clnt = ERR_PTR(err);
984         }
985 out:
986         return clnt;
987 }
988 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
989
990 struct rpc_xprt *
991 rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
992 {
993         struct rpc_xprt_switch *xps;
994
995         if (!xprt)
996                 return NULL;
997         rcu_read_lock();
998         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
999         atomic_long_inc(&xps->xps_queuelen);
1000         rcu_read_unlock();
1001         atomic_long_inc(&xprt->queuelen);
1002
1003         return xprt;
1004 }
1005
1006 static void
1007 rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1008 {
1009         struct rpc_xprt_switch *xps;
1010
1011         atomic_long_dec(&xprt->queuelen);
1012         rcu_read_lock();
1013         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1014         atomic_long_dec(&xps->xps_queuelen);
1015         rcu_read_unlock();
1016
1017         xprt_put(xprt);
1018 }
1019
1020 void rpc_task_release_transport(struct rpc_task *task)
1021 {
1022         struct rpc_xprt *xprt = task->tk_xprt;
1023
1024         if (xprt) {
1025                 task->tk_xprt = NULL;
1026                 if (task->tk_client)
1027                         rpc_task_release_xprt(task->tk_client, xprt);
1028                 else
1029                         xprt_put(xprt);
1030         }
1031 }
1032 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1033
1034 void rpc_task_release_client(struct rpc_task *task)
1035 {
1036         struct rpc_clnt *clnt = task->tk_client;
1037
1038         rpc_task_release_transport(task);
1039         if (clnt != NULL) {
1040                 /* Remove from client task list */
1041                 spin_lock(&clnt->cl_lock);
1042                 list_del(&task->tk_task);
1043                 spin_unlock(&clnt->cl_lock);
1044                 task->tk_client = NULL;
1045
1046                 rpc_release_client(clnt);
1047         }
1048 }
1049
1050 static struct rpc_xprt *
1051 rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1052 {
1053         struct rpc_xprt *xprt;
1054
1055         rcu_read_lock();
1056         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1057         rcu_read_unlock();
1058         return rpc_task_get_xprt(clnt, xprt);
1059 }
1060
1061 static struct rpc_xprt *
1062 rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1063 {
1064         return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1065 }
1066
1067 static
1068 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1069 {
1070         if (task->tk_xprt)
1071                 return;
1072         if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1073                 task->tk_xprt = rpc_task_get_first_xprt(clnt);
1074         else
1075                 task->tk_xprt = rpc_task_get_next_xprt(clnt);
1076 }
1077
1078 static
1079 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1080 {
1081
1082         if (clnt != NULL) {
1083                 rpc_task_set_transport(task, clnt);
1084                 task->tk_client = clnt;
1085                 atomic_inc(&clnt->cl_count);
1086                 if (clnt->cl_softrtry)
1087                         task->tk_flags |= RPC_TASK_SOFT;
1088                 if (clnt->cl_softerr)
1089                         task->tk_flags |= RPC_TASK_TIMEOUT;
1090                 if (clnt->cl_noretranstimeo)
1091                         task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1092                 if (atomic_read(&clnt->cl_swapper))
1093                         task->tk_flags |= RPC_TASK_SWAPPER;
1094                 /* Add to the client's list of all tasks */
1095                 spin_lock(&clnt->cl_lock);
1096                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
1097                 spin_unlock(&clnt->cl_lock);
1098         }
1099 }
1100
1101 static void
1102 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1103 {
1104         if (msg != NULL) {
1105                 task->tk_msg.rpc_proc = msg->rpc_proc;
1106                 task->tk_msg.rpc_argp = msg->rpc_argp;
1107                 task->tk_msg.rpc_resp = msg->rpc_resp;
1108                 task->tk_msg.rpc_cred = msg->rpc_cred;
1109                 if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1110                         get_cred(task->tk_msg.rpc_cred);
1111         }
1112 }
1113
1114 /*
1115  * Default callback for async RPC calls
1116  */
1117 static void
1118 rpc_default_callback(struct rpc_task *task, void *data)
1119 {
1120 }
1121
1122 static const struct rpc_call_ops rpc_default_ops = {
1123         .rpc_call_done = rpc_default_callback,
1124 };
1125
1126 /**
1127  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1128  * @task_setup_data: pointer to task initialisation data
1129  */
1130 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1131 {
1132         struct rpc_task *task;
1133
1134         task = rpc_new_task(task_setup_data);
1135
1136         if (!RPC_IS_ASYNC(task))
1137                 task->tk_flags |= RPC_TASK_CRED_NOREF;
1138
1139         rpc_task_set_client(task, task_setup_data->rpc_client);
1140         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1141
1142         if (task->tk_action == NULL)
1143                 rpc_call_start(task);
1144
1145         atomic_inc(&task->tk_count);
1146         rpc_execute(task);
1147         return task;
1148 }
1149 EXPORT_SYMBOL_GPL(rpc_run_task);
1150
1151 /**
1152  * rpc_call_sync - Perform a synchronous RPC call
1153  * @clnt: pointer to RPC client
1154  * @msg: RPC call parameters
1155  * @flags: RPC call flags
1156  */
1157 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1158 {
1159         struct rpc_task *task;
1160         struct rpc_task_setup task_setup_data = {
1161                 .rpc_client = clnt,
1162                 .rpc_message = msg,
1163                 .callback_ops = &rpc_default_ops,
1164                 .flags = flags,
1165         };
1166         int status;
1167
1168         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1169         if (flags & RPC_TASK_ASYNC) {
1170                 rpc_release_calldata(task_setup_data.callback_ops,
1171                         task_setup_data.callback_data);
1172                 return -EINVAL;
1173         }
1174
1175         task = rpc_run_task(&task_setup_data);
1176         if (IS_ERR(task))
1177                 return PTR_ERR(task);
1178         status = task->tk_status;
1179         rpc_put_task(task);
1180         return status;
1181 }
1182 EXPORT_SYMBOL_GPL(rpc_call_sync);
1183
1184 /**
1185  * rpc_call_async - Perform an asynchronous RPC call
1186  * @clnt: pointer to RPC client
1187  * @msg: RPC call parameters
1188  * @flags: RPC call flags
1189  * @tk_ops: RPC call ops
1190  * @data: user call data
1191  */
1192 int
1193 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1194                const struct rpc_call_ops *tk_ops, void *data)
1195 {
1196         struct rpc_task *task;
1197         struct rpc_task_setup task_setup_data = {
1198                 .rpc_client = clnt,
1199                 .rpc_message = msg,
1200                 .callback_ops = tk_ops,
1201                 .callback_data = data,
1202                 .flags = flags|RPC_TASK_ASYNC,
1203         };
1204
1205         task = rpc_run_task(&task_setup_data);
1206         if (IS_ERR(task))
1207                 return PTR_ERR(task);
1208         rpc_put_task(task);
1209         return 0;
1210 }
1211 EXPORT_SYMBOL_GPL(rpc_call_async);
1212
1213 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1214 static void call_bc_encode(struct rpc_task *task);
1215
1216 /**
1217  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1218  * rpc_execute against it
1219  * @req: RPC request
1220  */
1221 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1222 {
1223         struct rpc_task *task;
1224         struct rpc_task_setup task_setup_data = {
1225                 .callback_ops = &rpc_default_ops,
1226                 .flags = RPC_TASK_SOFTCONN |
1227                         RPC_TASK_NO_RETRANS_TIMEOUT,
1228         };
1229
1230         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1231         /*
1232          * Create an rpc_task to send the data
1233          */
1234         task = rpc_new_task(&task_setup_data);
1235         xprt_init_bc_request(req, task);
1236
1237         task->tk_action = call_bc_encode;
1238         atomic_inc(&task->tk_count);
1239         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1240         rpc_execute(task);
1241
1242         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1243         return task;
1244 }
1245 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1246
1247 /**
1248  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1249  * @req: RPC request to prepare
1250  * @pages: vector of struct page pointers
1251  * @base: offset in first page where receive should start, in bytes
1252  * @len: expected size of the upper layer data payload, in bytes
1253  * @hdrsize: expected size of upper layer reply header, in XDR words
1254  *
1255  */
1256 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1257                              unsigned int base, unsigned int len,
1258                              unsigned int hdrsize)
1259 {
1260         hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1261
1262         xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1263         trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1264 }
1265 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1266
1267 void
1268 rpc_call_start(struct rpc_task *task)
1269 {
1270         task->tk_action = call_start;
1271 }
1272 EXPORT_SYMBOL_GPL(rpc_call_start);
1273
1274 /**
1275  * rpc_peeraddr - extract remote peer address from clnt's xprt
1276  * @clnt: RPC client structure
1277  * @buf: target buffer
1278  * @bufsize: length of target buffer
1279  *
1280  * Returns the number of bytes that are actually in the stored address.
1281  */
1282 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1283 {
1284         size_t bytes;
1285         struct rpc_xprt *xprt;
1286
1287         rcu_read_lock();
1288         xprt = rcu_dereference(clnt->cl_xprt);
1289
1290         bytes = xprt->addrlen;
1291         if (bytes > bufsize)
1292                 bytes = bufsize;
1293         memcpy(buf, &xprt->addr, bytes);
1294         rcu_read_unlock();
1295
1296         return bytes;
1297 }
1298 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1299
1300 /**
1301  * rpc_peeraddr2str - return remote peer address in printable format
1302  * @clnt: RPC client structure
1303  * @format: address format
1304  *
1305  * NB: the lifetime of the memory referenced by the returned pointer is
1306  * the same as the rpc_xprt itself.  As long as the caller uses this
1307  * pointer, it must hold the RCU read lock.
1308  */
1309 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1310                              enum rpc_display_format_t format)
1311 {
1312         struct rpc_xprt *xprt;
1313
1314         xprt = rcu_dereference(clnt->cl_xprt);
1315
1316         if (xprt->address_strings[format] != NULL)
1317                 return xprt->address_strings[format];
1318         else
1319                 return "unprintable";
1320 }
1321 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1322
1323 static const struct sockaddr_in rpc_inaddr_loopback = {
1324         .sin_family             = AF_INET,
1325         .sin_addr.s_addr        = htonl(INADDR_ANY),
1326 };
1327
1328 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1329         .sin6_family            = AF_INET6,
1330         .sin6_addr              = IN6ADDR_ANY_INIT,
1331 };
1332
1333 /*
1334  * Try a getsockname() on a connected datagram socket.  Using a
1335  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1336  * This conserves the ephemeral port number space.
1337  *
1338  * Returns zero and fills in "buf" if successful; otherwise, a
1339  * negative errno is returned.
1340  */
1341 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1342                         struct sockaddr *buf)
1343 {
1344         struct socket *sock;
1345         int err;
1346
1347         err = __sock_create(net, sap->sa_family,
1348                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1349         if (err < 0) {
1350                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1351                 goto out;
1352         }
1353
1354         switch (sap->sa_family) {
1355         case AF_INET:
1356                 err = kernel_bind(sock,
1357                                 (struct sockaddr *)&rpc_inaddr_loopback,
1358                                 sizeof(rpc_inaddr_loopback));
1359                 break;
1360         case AF_INET6:
1361                 err = kernel_bind(sock,
1362                                 (struct sockaddr *)&rpc_in6addr_loopback,
1363                                 sizeof(rpc_in6addr_loopback));
1364                 break;
1365         default:
1366                 err = -EAFNOSUPPORT;
1367                 goto out;
1368         }
1369         if (err < 0) {
1370                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1371                 goto out_release;
1372         }
1373
1374         err = kernel_connect(sock, sap, salen, 0);
1375         if (err < 0) {
1376                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1377                 goto out_release;
1378         }
1379
1380         err = kernel_getsockname(sock, buf);
1381         if (err < 0) {
1382                 dprintk("RPC:       getsockname failed (%d)\n", err);
1383                 goto out_release;
1384         }
1385
1386         err = 0;
1387         if (buf->sa_family == AF_INET6) {
1388                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1389                 sin6->sin6_scope_id = 0;
1390         }
1391         dprintk("RPC:       %s succeeded\n", __func__);
1392
1393 out_release:
1394         sock_release(sock);
1395 out:
1396         return err;
1397 }
1398
1399 /*
1400  * Scraping a connected socket failed, so we don't have a useable
1401  * local address.  Fallback: generate an address that will prevent
1402  * the server from calling us back.
1403  *
1404  * Returns zero and fills in "buf" if successful; otherwise, a
1405  * negative errno is returned.
1406  */
1407 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1408 {
1409         switch (family) {
1410         case AF_INET:
1411                 if (buflen < sizeof(rpc_inaddr_loopback))
1412                         return -EINVAL;
1413                 memcpy(buf, &rpc_inaddr_loopback,
1414                                 sizeof(rpc_inaddr_loopback));
1415                 break;
1416         case AF_INET6:
1417                 if (buflen < sizeof(rpc_in6addr_loopback))
1418                         return -EINVAL;
1419                 memcpy(buf, &rpc_in6addr_loopback,
1420                                 sizeof(rpc_in6addr_loopback));
1421                 break;
1422         default:
1423                 dprintk("RPC:       %s: address family not supported\n",
1424                         __func__);
1425                 return -EAFNOSUPPORT;
1426         }
1427         dprintk("RPC:       %s: succeeded\n", __func__);
1428         return 0;
1429 }
1430
1431 /**
1432  * rpc_localaddr - discover local endpoint address for an RPC client
1433  * @clnt: RPC client structure
1434  * @buf: target buffer
1435  * @buflen: size of target buffer, in bytes
1436  *
1437  * Returns zero and fills in "buf" and "buflen" if successful;
1438  * otherwise, a negative errno is returned.
1439  *
1440  * This works even if the underlying transport is not currently connected,
1441  * or if the upper layer never previously provided a source address.
1442  *
1443  * The result of this function call is transient: multiple calls in
1444  * succession may give different results, depending on how local
1445  * networking configuration changes over time.
1446  */
1447 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1448 {
1449         struct sockaddr_storage address;
1450         struct sockaddr *sap = (struct sockaddr *)&address;
1451         struct rpc_xprt *xprt;
1452         struct net *net;
1453         size_t salen;
1454         int err;
1455
1456         rcu_read_lock();
1457         xprt = rcu_dereference(clnt->cl_xprt);
1458         salen = xprt->addrlen;
1459         memcpy(sap, &xprt->addr, salen);
1460         net = get_net(xprt->xprt_net);
1461         rcu_read_unlock();
1462
1463         rpc_set_port(sap, 0);
1464         err = rpc_sockname(net, sap, salen, buf);
1465         put_net(net);
1466         if (err != 0)
1467                 /* Couldn't discover local address, return ANYADDR */
1468                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1469         return 0;
1470 }
1471 EXPORT_SYMBOL_GPL(rpc_localaddr);
1472
1473 void
1474 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1475 {
1476         struct rpc_xprt *xprt;
1477
1478         rcu_read_lock();
1479         xprt = rcu_dereference(clnt->cl_xprt);
1480         if (xprt->ops->set_buffer_size)
1481                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1482         rcu_read_unlock();
1483 }
1484 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1485
1486 /**
1487  * rpc_net_ns - Get the network namespace for this RPC client
1488  * @clnt: RPC client to query
1489  *
1490  */
1491 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1492 {
1493         struct net *ret;
1494
1495         rcu_read_lock();
1496         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1497         rcu_read_unlock();
1498         return ret;
1499 }
1500 EXPORT_SYMBOL_GPL(rpc_net_ns);
1501
1502 /**
1503  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1504  * @clnt: RPC client to query
1505  *
1506  * For stream transports, this is one RPC record fragment (see RFC
1507  * 1831), as we don't support multi-record requests yet.  For datagram
1508  * transports, this is the size of an IP packet minus the IP, UDP, and
1509  * RPC header sizes.
1510  */
1511 size_t rpc_max_payload(struct rpc_clnt *clnt)
1512 {
1513         size_t ret;
1514
1515         rcu_read_lock();
1516         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1517         rcu_read_unlock();
1518         return ret;
1519 }
1520 EXPORT_SYMBOL_GPL(rpc_max_payload);
1521
1522 /**
1523  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1524  * @clnt: RPC client to query
1525  */
1526 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1527 {
1528         struct rpc_xprt *xprt;
1529         size_t ret;
1530
1531         rcu_read_lock();
1532         xprt = rcu_dereference(clnt->cl_xprt);
1533         ret = xprt->ops->bc_maxpayload(xprt);
1534         rcu_read_unlock();
1535         return ret;
1536 }
1537 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1538
1539 unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1540 {
1541         struct rpc_xprt *xprt;
1542         unsigned int ret;
1543
1544         rcu_read_lock();
1545         xprt = rcu_dereference(clnt->cl_xprt);
1546         ret = xprt->ops->bc_num_slots(xprt);
1547         rcu_read_unlock();
1548         return ret;
1549 }
1550 EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1551
1552 /**
1553  * rpc_force_rebind - force transport to check that remote port is unchanged
1554  * @clnt: client to rebind
1555  *
1556  */
1557 void rpc_force_rebind(struct rpc_clnt *clnt)
1558 {
1559         if (clnt->cl_autobind) {
1560                 rcu_read_lock();
1561                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1562                 rcu_read_unlock();
1563         }
1564 }
1565 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1566
1567 static int
1568 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1569 {
1570         task->tk_status = 0;
1571         task->tk_rpc_status = 0;
1572         task->tk_action = action;
1573         return 1;
1574 }
1575
1576 /*
1577  * Restart an (async) RPC call. Usually called from within the
1578  * exit handler.
1579  */
1580 int
1581 rpc_restart_call(struct rpc_task *task)
1582 {
1583         return __rpc_restart_call(task, call_start);
1584 }
1585 EXPORT_SYMBOL_GPL(rpc_restart_call);
1586
1587 /*
1588  * Restart an (async) RPC call from the call_prepare state.
1589  * Usually called from within the exit handler.
1590  */
1591 int
1592 rpc_restart_call_prepare(struct rpc_task *task)
1593 {
1594         if (task->tk_ops->rpc_call_prepare != NULL)
1595                 return __rpc_restart_call(task, rpc_prepare_task);
1596         return rpc_restart_call(task);
1597 }
1598 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1599
1600 const char
1601 *rpc_proc_name(const struct rpc_task *task)
1602 {
1603         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1604
1605         if (proc) {
1606                 if (proc->p_name)
1607                         return proc->p_name;
1608                 else
1609                         return "NULL";
1610         } else
1611                 return "no proc";
1612 }
1613
1614 static void
1615 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1616 {
1617         trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1618         task->tk_rpc_status = rpc_status;
1619         rpc_exit(task, tk_status);
1620 }
1621
1622 static void
1623 rpc_call_rpcerror(struct rpc_task *task, int status)
1624 {
1625         __rpc_call_rpcerror(task, status, status);
1626 }
1627
1628 /*
1629  * 0.  Initial state
1630  *
1631  *     Other FSM states can be visited zero or more times, but
1632  *     this state is visited exactly once for each RPC.
1633  */
1634 static void
1635 call_start(struct rpc_task *task)
1636 {
1637         struct rpc_clnt *clnt = task->tk_client;
1638         int idx = task->tk_msg.rpc_proc->p_statidx;
1639
1640         trace_rpc_request(task);
1641
1642         /* Increment call count (version might not be valid for ping) */
1643         if (clnt->cl_program->version[clnt->cl_vers])
1644                 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1645         clnt->cl_stats->rpccnt++;
1646         task->tk_action = call_reserve;
1647         rpc_task_set_transport(task, clnt);
1648 }
1649
1650 /*
1651  * 1.   Reserve an RPC call slot
1652  */
1653 static void
1654 call_reserve(struct rpc_task *task)
1655 {
1656         task->tk_status  = 0;
1657         task->tk_action  = call_reserveresult;
1658         xprt_reserve(task);
1659 }
1660
1661 static void call_retry_reserve(struct rpc_task *task);
1662
1663 /*
1664  * 1b.  Grok the result of xprt_reserve()
1665  */
1666 static void
1667 call_reserveresult(struct rpc_task *task)
1668 {
1669         int status = task->tk_status;
1670
1671         /*
1672          * After a call to xprt_reserve(), we must have either
1673          * a request slot or else an error status.
1674          */
1675         task->tk_status = 0;
1676         if (status >= 0) {
1677                 if (task->tk_rqstp) {
1678                         task->tk_action = call_refresh;
1679                         return;
1680                 }
1681
1682                 rpc_call_rpcerror(task, -EIO);
1683                 return;
1684         }
1685
1686         switch (status) {
1687         case -ENOMEM:
1688                 rpc_delay(task, HZ >> 2);
1689                 fallthrough;
1690         case -EAGAIN:   /* woken up; retry */
1691                 task->tk_action = call_retry_reserve;
1692                 return;
1693         default:
1694                 rpc_call_rpcerror(task, status);
1695         }
1696 }
1697
1698 /*
1699  * 1c.  Retry reserving an RPC call slot
1700  */
1701 static void
1702 call_retry_reserve(struct rpc_task *task)
1703 {
1704         task->tk_status  = 0;
1705         task->tk_action  = call_reserveresult;
1706         xprt_retry_reserve(task);
1707 }
1708
1709 /*
1710  * 2.   Bind and/or refresh the credentials
1711  */
1712 static void
1713 call_refresh(struct rpc_task *task)
1714 {
1715         task->tk_action = call_refreshresult;
1716         task->tk_status = 0;
1717         task->tk_client->cl_stats->rpcauthrefresh++;
1718         rpcauth_refreshcred(task);
1719 }
1720
1721 /*
1722  * 2a.  Process the results of a credential refresh
1723  */
1724 static void
1725 call_refreshresult(struct rpc_task *task)
1726 {
1727         int status = task->tk_status;
1728
1729         task->tk_status = 0;
1730         task->tk_action = call_refresh;
1731         switch (status) {
1732         case 0:
1733                 if (rpcauth_uptodatecred(task)) {
1734                         task->tk_action = call_allocate;
1735                         return;
1736                 }
1737                 /* Use rate-limiting and a max number of retries if refresh
1738                  * had status 0 but failed to update the cred.
1739                  */
1740                 fallthrough;
1741         case -ETIMEDOUT:
1742                 rpc_delay(task, 3*HZ);
1743                 fallthrough;
1744         case -EAGAIN:
1745                 status = -EACCES;
1746                 fallthrough;
1747         case -EKEYEXPIRED:
1748                 if (!task->tk_cred_retry)
1749                         break;
1750                 task->tk_cred_retry--;
1751                 trace_rpc_retry_refresh_status(task);
1752                 return;
1753         }
1754         trace_rpc_refresh_status(task);
1755         rpc_call_rpcerror(task, status);
1756 }
1757
1758 /*
1759  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1760  *      (Note: buffer memory is freed in xprt_release).
1761  */
1762 static void
1763 call_allocate(struct rpc_task *task)
1764 {
1765         const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1766         struct rpc_rqst *req = task->tk_rqstp;
1767         struct rpc_xprt *xprt = req->rq_xprt;
1768         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1769         int status;
1770
1771         task->tk_status = 0;
1772         task->tk_action = call_encode;
1773
1774         if (req->rq_buffer)
1775                 return;
1776
1777         if (proc->p_proc != 0) {
1778                 BUG_ON(proc->p_arglen == 0);
1779                 if (proc->p_decode != NULL)
1780                         BUG_ON(proc->p_replen == 0);
1781         }
1782
1783         /*
1784          * Calculate the size (in quads) of the RPC call
1785          * and reply headers, and convert both values
1786          * to byte sizes.
1787          */
1788         req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1789                            proc->p_arglen;
1790         req->rq_callsize <<= 2;
1791         /*
1792          * Note: the reply buffer must at minimum allocate enough space
1793          * for the 'struct accepted_reply' from RFC5531.
1794          */
1795         req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1796                         max_t(size_t, proc->p_replen, 2);
1797         req->rq_rcvsize <<= 2;
1798
1799         status = xprt->ops->buf_alloc(task);
1800         trace_rpc_buf_alloc(task, status);
1801         if (status == 0)
1802                 return;
1803         if (status != -ENOMEM) {
1804                 rpc_call_rpcerror(task, status);
1805                 return;
1806         }
1807
1808         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1809                 task->tk_action = call_allocate;
1810                 rpc_delay(task, HZ>>4);
1811                 return;
1812         }
1813
1814         rpc_call_rpcerror(task, -ERESTARTSYS);
1815 }
1816
1817 static int
1818 rpc_task_need_encode(struct rpc_task *task)
1819 {
1820         return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1821                 (!(task->tk_flags & RPC_TASK_SENT) ||
1822                  !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1823                  xprt_request_need_retransmit(task));
1824 }
1825
1826 static void
1827 rpc_xdr_encode(struct rpc_task *task)
1828 {
1829         struct rpc_rqst *req = task->tk_rqstp;
1830         struct xdr_stream xdr;
1831
1832         xdr_buf_init(&req->rq_snd_buf,
1833                      req->rq_buffer,
1834                      req->rq_callsize);
1835         xdr_buf_init(&req->rq_rcv_buf,
1836                      req->rq_rbuffer,
1837                      req->rq_rcvsize);
1838
1839         req->rq_reply_bytes_recvd = 0;
1840         req->rq_snd_buf.head[0].iov_len = 0;
1841         xdr_init_encode(&xdr, &req->rq_snd_buf,
1842                         req->rq_snd_buf.head[0].iov_base, req);
1843         xdr_free_bvec(&req->rq_snd_buf);
1844         if (rpc_encode_header(task, &xdr))
1845                 return;
1846
1847         task->tk_status = rpcauth_wrap_req(task, &xdr);
1848 }
1849
1850 /*
1851  * 3.   Encode arguments of an RPC call
1852  */
1853 static void
1854 call_encode(struct rpc_task *task)
1855 {
1856         if (!rpc_task_need_encode(task))
1857                 goto out;
1858
1859         /* Dequeue task from the receive queue while we're encoding */
1860         xprt_request_dequeue_xprt(task);
1861         /* Encode here so that rpcsec_gss can use correct sequence number. */
1862         rpc_xdr_encode(task);
1863         /* Did the encode result in an error condition? */
1864         if (task->tk_status != 0) {
1865                 /* Was the error nonfatal? */
1866                 switch (task->tk_status) {
1867                 case -EAGAIN:
1868                 case -ENOMEM:
1869                         rpc_delay(task, HZ >> 4);
1870                         break;
1871                 case -EKEYEXPIRED:
1872                         if (!task->tk_cred_retry) {
1873                                 rpc_exit(task, task->tk_status);
1874                         } else {
1875                                 task->tk_action = call_refresh;
1876                                 task->tk_cred_retry--;
1877                                 trace_rpc_retry_refresh_status(task);
1878                         }
1879                         break;
1880                 default:
1881                         rpc_call_rpcerror(task, task->tk_status);
1882                 }
1883                 return;
1884         }
1885
1886         /* Add task to reply queue before transmission to avoid races */
1887         if (rpc_reply_expected(task))
1888                 xprt_request_enqueue_receive(task);
1889         xprt_request_enqueue_transmit(task);
1890 out:
1891         task->tk_action = call_transmit;
1892         /* Check that the connection is OK */
1893         if (!xprt_bound(task->tk_xprt))
1894                 task->tk_action = call_bind;
1895         else if (!xprt_connected(task->tk_xprt))
1896                 task->tk_action = call_connect;
1897 }
1898
1899 /*
1900  * Helpers to check if the task was already transmitted, and
1901  * to take action when that is the case.
1902  */
1903 static bool
1904 rpc_task_transmitted(struct rpc_task *task)
1905 {
1906         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1907 }
1908
1909 static void
1910 rpc_task_handle_transmitted(struct rpc_task *task)
1911 {
1912         xprt_end_transmit(task);
1913         task->tk_action = call_transmit_status;
1914 }
1915
1916 /*
1917  * 4.   Get the server port number if not yet set
1918  */
1919 static void
1920 call_bind(struct rpc_task *task)
1921 {
1922         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1923
1924         if (rpc_task_transmitted(task)) {
1925                 rpc_task_handle_transmitted(task);
1926                 return;
1927         }
1928
1929         if (xprt_bound(xprt)) {
1930                 task->tk_action = call_connect;
1931                 return;
1932         }
1933
1934         task->tk_action = call_bind_status;
1935         if (!xprt_prepare_transmit(task))
1936                 return;
1937
1938         xprt->ops->rpcbind(task);
1939 }
1940
1941 /*
1942  * 4a.  Sort out bind result
1943  */
1944 static void
1945 call_bind_status(struct rpc_task *task)
1946 {
1947         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1948         int status = -EIO;
1949
1950         if (rpc_task_transmitted(task)) {
1951                 rpc_task_handle_transmitted(task);
1952                 return;
1953         }
1954
1955         if (task->tk_status >= 0)
1956                 goto out_next;
1957         if (xprt_bound(xprt)) {
1958                 task->tk_status = 0;
1959                 goto out_next;
1960         }
1961
1962         switch (task->tk_status) {
1963         case -ENOMEM:
1964                 rpc_delay(task, HZ >> 2);
1965                 goto retry_timeout;
1966         case -EACCES:
1967                 trace_rpcb_prog_unavail_err(task);
1968                 /* fail immediately if this is an RPC ping */
1969                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1970                         status = -EOPNOTSUPP;
1971                         break;
1972                 }
1973                 if (task->tk_rebind_retry == 0)
1974                         break;
1975                 task->tk_rebind_retry--;
1976                 rpc_delay(task, 3*HZ);
1977                 goto retry_timeout;
1978         case -ENOBUFS:
1979                 rpc_delay(task, HZ >> 2);
1980                 goto retry_timeout;
1981         case -EAGAIN:
1982                 goto retry_timeout;
1983         case -ETIMEDOUT:
1984                 trace_rpcb_timeout_err(task);
1985                 goto retry_timeout;
1986         case -EPFNOSUPPORT:
1987                 /* server doesn't support any rpcbind version we know of */
1988                 trace_rpcb_bind_version_err(task);
1989                 break;
1990         case -EPROTONOSUPPORT:
1991                 trace_rpcb_bind_version_err(task);
1992                 goto retry_timeout;
1993         case -ECONNREFUSED:             /* connection problems */
1994         case -ECONNRESET:
1995         case -ECONNABORTED:
1996         case -ENOTCONN:
1997         case -EHOSTDOWN:
1998         case -ENETDOWN:
1999         case -EHOSTUNREACH:
2000         case -ENETUNREACH:
2001         case -EPIPE:
2002                 trace_rpcb_unreachable_err(task);
2003                 if (!RPC_IS_SOFTCONN(task)) {
2004                         rpc_delay(task, 5*HZ);
2005                         goto retry_timeout;
2006                 }
2007                 status = task->tk_status;
2008                 break;
2009         default:
2010                 trace_rpcb_unrecognized_err(task);
2011         }
2012
2013         rpc_call_rpcerror(task, status);
2014         return;
2015 out_next:
2016         task->tk_action = call_connect;
2017         return;
2018 retry_timeout:
2019         task->tk_status = 0;
2020         task->tk_action = call_bind;
2021         rpc_check_timeout(task);
2022 }
2023
2024 /*
2025  * 4b.  Connect to the RPC server
2026  */
2027 static void
2028 call_connect(struct rpc_task *task)
2029 {
2030         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2031
2032         if (rpc_task_transmitted(task)) {
2033                 rpc_task_handle_transmitted(task);
2034                 return;
2035         }
2036
2037         if (xprt_connected(xprt)) {
2038                 task->tk_action = call_transmit;
2039                 return;
2040         }
2041
2042         task->tk_action = call_connect_status;
2043         if (task->tk_status < 0)
2044                 return;
2045         if (task->tk_flags & RPC_TASK_NOCONNECT) {
2046                 rpc_call_rpcerror(task, -ENOTCONN);
2047                 return;
2048         }
2049         if (!xprt_prepare_transmit(task))
2050                 return;
2051         xprt_connect(task);
2052 }
2053
2054 /*
2055  * 4c.  Sort out connect result
2056  */
2057 static void
2058 call_connect_status(struct rpc_task *task)
2059 {
2060         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2061         struct rpc_clnt *clnt = task->tk_client;
2062         int status = task->tk_status;
2063
2064         if (rpc_task_transmitted(task)) {
2065                 rpc_task_handle_transmitted(task);
2066                 return;
2067         }
2068
2069         trace_rpc_connect_status(task);
2070
2071         if (task->tk_status == 0) {
2072                 clnt->cl_stats->netreconn++;
2073                 goto out_next;
2074         }
2075         if (xprt_connected(xprt)) {
2076                 task->tk_status = 0;
2077                 goto out_next;
2078         }
2079
2080         task->tk_status = 0;
2081         switch (status) {
2082         case -ECONNREFUSED:
2083                 /* A positive refusal suggests a rebind is needed. */
2084                 if (RPC_IS_SOFTCONN(task))
2085                         break;
2086                 if (clnt->cl_autobind) {
2087                         rpc_force_rebind(clnt);
2088                         goto out_retry;
2089                 }
2090                 fallthrough;
2091         case -ECONNRESET:
2092         case -ECONNABORTED:
2093         case -ENETDOWN:
2094         case -ENETUNREACH:
2095         case -EHOSTUNREACH:
2096         case -EPIPE:
2097         case -EPROTO:
2098                 xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2099                                             task->tk_rqstp->rq_connect_cookie);
2100                 if (RPC_IS_SOFTCONN(task))
2101                         break;
2102                 /* retry with existing socket, after a delay */
2103                 rpc_delay(task, 3*HZ);
2104                 fallthrough;
2105         case -EADDRINUSE:
2106         case -ENOTCONN:
2107         case -EAGAIN:
2108         case -ETIMEDOUT:
2109                 if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2110                     (task->tk_flags & RPC_TASK_MOVEABLE) &&
2111                     test_bit(XPRT_REMOVE, &xprt->state)) {
2112                         struct rpc_xprt *saved = task->tk_xprt;
2113                         struct rpc_xprt_switch *xps;
2114
2115                         rcu_read_lock();
2116                         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2117                         rcu_read_unlock();
2118                         if (xps->xps_nxprts > 1) {
2119                                 long value;
2120
2121                                 xprt_release(task);
2122                                 value = atomic_long_dec_return(&xprt->queuelen);
2123                                 if (value == 0)
2124                                         rpc_xprt_switch_remove_xprt(xps, saved);
2125                                 xprt_put(saved);
2126                                 task->tk_xprt = NULL;
2127                                 task->tk_action = call_start;
2128                         }
2129                         xprt_switch_put(xps);
2130                         if (!task->tk_xprt)
2131                                 return;
2132                 }
2133                 goto out_retry;
2134         case -ENOBUFS:
2135                 rpc_delay(task, HZ >> 2);
2136                 goto out_retry;
2137         }
2138         rpc_call_rpcerror(task, status);
2139         return;
2140 out_next:
2141         task->tk_action = call_transmit;
2142         return;
2143 out_retry:
2144         /* Check for timeouts before looping back to call_bind */
2145         task->tk_action = call_bind;
2146         rpc_check_timeout(task);
2147 }
2148
2149 /*
2150  * 5.   Transmit the RPC request, and wait for reply
2151  */
2152 static void
2153 call_transmit(struct rpc_task *task)
2154 {
2155         if (rpc_task_transmitted(task)) {
2156                 rpc_task_handle_transmitted(task);
2157                 return;
2158         }
2159
2160         task->tk_action = call_transmit_status;
2161         if (!xprt_prepare_transmit(task))
2162                 return;
2163         task->tk_status = 0;
2164         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2165                 if (!xprt_connected(task->tk_xprt)) {
2166                         task->tk_status = -ENOTCONN;
2167                         return;
2168                 }
2169                 xprt_transmit(task);
2170         }
2171         xprt_end_transmit(task);
2172 }
2173
2174 /*
2175  * 5a.  Handle cleanup after a transmission
2176  */
2177 static void
2178 call_transmit_status(struct rpc_task *task)
2179 {
2180         task->tk_action = call_status;
2181
2182         /*
2183          * Common case: success.  Force the compiler to put this
2184          * test first.
2185          */
2186         if (rpc_task_transmitted(task)) {
2187                 task->tk_status = 0;
2188                 xprt_request_wait_receive(task);
2189                 return;
2190         }
2191
2192         switch (task->tk_status) {
2193         default:
2194                 break;
2195         case -EBADMSG:
2196                 task->tk_status = 0;
2197                 task->tk_action = call_encode;
2198                 break;
2199                 /*
2200                  * Special cases: if we've been waiting on the
2201                  * socket's write_space() callback, or if the
2202                  * socket just returned a connection error,
2203                  * then hold onto the transport lock.
2204                  */
2205         case -ENOBUFS:
2206                 rpc_delay(task, HZ>>2);
2207                 fallthrough;
2208         case -EBADSLT:
2209         case -EAGAIN:
2210                 task->tk_action = call_transmit;
2211                 task->tk_status = 0;
2212                 break;
2213         case -ECONNREFUSED:
2214         case -EHOSTDOWN:
2215         case -ENETDOWN:
2216         case -EHOSTUNREACH:
2217         case -ENETUNREACH:
2218         case -EPERM:
2219                 if (RPC_IS_SOFTCONN(task)) {
2220                         if (!task->tk_msg.rpc_proc->p_proc)
2221                                 trace_xprt_ping(task->tk_xprt,
2222                                                 task->tk_status);
2223                         rpc_call_rpcerror(task, task->tk_status);
2224                         return;
2225                 }
2226                 fallthrough;
2227         case -ECONNRESET:
2228         case -ECONNABORTED:
2229         case -EADDRINUSE:
2230         case -ENOTCONN:
2231         case -EPIPE:
2232                 task->tk_action = call_bind;
2233                 task->tk_status = 0;
2234                 break;
2235         }
2236         rpc_check_timeout(task);
2237 }
2238
2239 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2240 static void call_bc_transmit(struct rpc_task *task);
2241 static void call_bc_transmit_status(struct rpc_task *task);
2242
2243 static void
2244 call_bc_encode(struct rpc_task *task)
2245 {
2246         xprt_request_enqueue_transmit(task);
2247         task->tk_action = call_bc_transmit;
2248 }
2249
2250 /*
2251  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2252  * addition, disconnect on connectivity errors.
2253  */
2254 static void
2255 call_bc_transmit(struct rpc_task *task)
2256 {
2257         task->tk_action = call_bc_transmit_status;
2258         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2259                 if (!xprt_prepare_transmit(task))
2260                         return;
2261                 task->tk_status = 0;
2262                 xprt_transmit(task);
2263         }
2264         xprt_end_transmit(task);
2265 }
2266
2267 static void
2268 call_bc_transmit_status(struct rpc_task *task)
2269 {
2270         struct rpc_rqst *req = task->tk_rqstp;
2271
2272         if (rpc_task_transmitted(task))
2273                 task->tk_status = 0;
2274
2275         switch (task->tk_status) {
2276         case 0:
2277                 /* Success */
2278         case -ENETDOWN:
2279         case -EHOSTDOWN:
2280         case -EHOSTUNREACH:
2281         case -ENETUNREACH:
2282         case -ECONNRESET:
2283         case -ECONNREFUSED:
2284         case -EADDRINUSE:
2285         case -ENOTCONN:
2286         case -EPIPE:
2287                 break;
2288         case -ENOBUFS:
2289                 rpc_delay(task, HZ>>2);
2290                 fallthrough;
2291         case -EBADSLT:
2292         case -EAGAIN:
2293                 task->tk_status = 0;
2294                 task->tk_action = call_bc_transmit;
2295                 return;
2296         case -ETIMEDOUT:
2297                 /*
2298                  * Problem reaching the server.  Disconnect and let the
2299                  * forechannel reestablish the connection.  The server will
2300                  * have to retransmit the backchannel request and we'll
2301                  * reprocess it.  Since these ops are idempotent, there's no
2302                  * need to cache our reply at this time.
2303                  */
2304                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2305                         "error: %d\n", task->tk_status);
2306                 xprt_conditional_disconnect(req->rq_xprt,
2307                         req->rq_connect_cookie);
2308                 break;
2309         default:
2310                 /*
2311                  * We were unable to reply and will have to drop the
2312                  * request.  The server should reconnect and retransmit.
2313                  */
2314                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2315                         "error: %d\n", task->tk_status);
2316                 break;
2317         }
2318         task->tk_action = rpc_exit_task;
2319 }
2320 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2321
2322 /*
2323  * 6.   Sort out the RPC call status
2324  */
2325 static void
2326 call_status(struct rpc_task *task)
2327 {
2328         struct rpc_clnt *clnt = task->tk_client;
2329         int             status;
2330
2331         if (!task->tk_msg.rpc_proc->p_proc)
2332                 trace_xprt_ping(task->tk_xprt, task->tk_status);
2333
2334         status = task->tk_status;
2335         if (status >= 0) {
2336                 task->tk_action = call_decode;
2337                 return;
2338         }
2339
2340         trace_rpc_call_status(task);
2341         task->tk_status = 0;
2342         switch(status) {
2343         case -EHOSTDOWN:
2344         case -ENETDOWN:
2345         case -EHOSTUNREACH:
2346         case -ENETUNREACH:
2347         case -EPERM:
2348                 if (RPC_IS_SOFTCONN(task))
2349                         goto out_exit;
2350                 /*
2351                  * Delay any retries for 3 seconds, then handle as if it
2352                  * were a timeout.
2353                  */
2354                 rpc_delay(task, 3*HZ);
2355                 fallthrough;
2356         case -ETIMEDOUT:
2357                 break;
2358         case -ECONNREFUSED:
2359         case -ECONNRESET:
2360         case -ECONNABORTED:
2361         case -ENOTCONN:
2362                 rpc_force_rebind(clnt);
2363                 break;
2364         case -EADDRINUSE:
2365                 rpc_delay(task, 3*HZ);
2366                 fallthrough;
2367         case -EPIPE:
2368         case -EAGAIN:
2369                 break;
2370         case -EIO:
2371                 /* shutdown or soft timeout */
2372                 goto out_exit;
2373         default:
2374                 if (clnt->cl_chatty)
2375                         printk("%s: RPC call returned error %d\n",
2376                                clnt->cl_program->name, -status);
2377                 goto out_exit;
2378         }
2379         task->tk_action = call_encode;
2380         if (status != -ECONNRESET && status != -ECONNABORTED)
2381                 rpc_check_timeout(task);
2382         return;
2383 out_exit:
2384         rpc_call_rpcerror(task, status);
2385 }
2386
2387 static bool
2388 rpc_check_connected(const struct rpc_rqst *req)
2389 {
2390         /* No allocated request or transport? return true */
2391         if (!req || !req->rq_xprt)
2392                 return true;
2393         return xprt_connected(req->rq_xprt);
2394 }
2395
2396 static void
2397 rpc_check_timeout(struct rpc_task *task)
2398 {
2399         struct rpc_clnt *clnt = task->tk_client;
2400
2401         if (RPC_SIGNALLED(task)) {
2402                 rpc_call_rpcerror(task, -ERESTARTSYS);
2403                 return;
2404         }
2405
2406         if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2407                 return;
2408
2409         trace_rpc_timeout_status(task);
2410         task->tk_timeouts++;
2411
2412         if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2413                 rpc_call_rpcerror(task, -ETIMEDOUT);
2414                 return;
2415         }
2416
2417         if (RPC_IS_SOFT(task)) {
2418                 /*
2419                  * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2420                  * been sent, it should time out only if the transport
2421                  * connection gets terminally broken.
2422                  */
2423                 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2424                     rpc_check_connected(task->tk_rqstp))
2425                         return;
2426
2427                 if (clnt->cl_chatty) {
2428                         pr_notice_ratelimited(
2429                                 "%s: server %s not responding, timed out\n",
2430                                 clnt->cl_program->name,
2431                                 task->tk_xprt->servername);
2432                 }
2433                 if (task->tk_flags & RPC_TASK_TIMEOUT)
2434                         rpc_call_rpcerror(task, -ETIMEDOUT);
2435                 else
2436                         __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2437                 return;
2438         }
2439
2440         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2441                 task->tk_flags |= RPC_CALL_MAJORSEEN;
2442                 if (clnt->cl_chatty) {
2443                         pr_notice_ratelimited(
2444                                 "%s: server %s not responding, still trying\n",
2445                                 clnt->cl_program->name,
2446                                 task->tk_xprt->servername);
2447                 }
2448         }
2449         rpc_force_rebind(clnt);
2450         /*
2451          * Did our request time out due to an RPCSEC_GSS out-of-sequence
2452          * event? RFC2203 requires the server to drop all such requests.
2453          */
2454         rpcauth_invalcred(task);
2455 }
2456
2457 /*
2458  * 7.   Decode the RPC reply
2459  */
2460 static void
2461 call_decode(struct rpc_task *task)
2462 {
2463         struct rpc_clnt *clnt = task->tk_client;
2464         struct rpc_rqst *req = task->tk_rqstp;
2465         struct xdr_stream xdr;
2466         int err;
2467
2468         if (!task->tk_msg.rpc_proc->p_decode) {
2469                 task->tk_action = rpc_exit_task;
2470                 return;
2471         }
2472
2473         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2474                 if (clnt->cl_chatty) {
2475                         pr_notice_ratelimited("%s: server %s OK\n",
2476                                 clnt->cl_program->name,
2477                                 task->tk_xprt->servername);
2478                 }
2479                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2480         }
2481
2482         /*
2483          * Did we ever call xprt_complete_rqst()? If not, we should assume
2484          * the message is incomplete.
2485          */
2486         err = -EAGAIN;
2487         if (!req->rq_reply_bytes_recvd)
2488                 goto out;
2489
2490         /* Ensure that we see all writes made by xprt_complete_rqst()
2491          * before it changed req->rq_reply_bytes_recvd.
2492          */
2493         smp_rmb();
2494
2495         req->rq_rcv_buf.len = req->rq_private_buf.len;
2496         trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2497
2498         /* Check that the softirq receive buffer is valid */
2499         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2500                                 sizeof(req->rq_rcv_buf)) != 0);
2501
2502         xdr_init_decode(&xdr, &req->rq_rcv_buf,
2503                         req->rq_rcv_buf.head[0].iov_base, req);
2504         err = rpc_decode_header(task, &xdr);
2505 out:
2506         switch (err) {
2507         case 0:
2508                 task->tk_action = rpc_exit_task;
2509                 task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2510                 return;
2511         case -EAGAIN:
2512                 task->tk_status = 0;
2513                 if (task->tk_client->cl_discrtry)
2514                         xprt_conditional_disconnect(req->rq_xprt,
2515                                                     req->rq_connect_cookie);
2516                 task->tk_action = call_encode;
2517                 rpc_check_timeout(task);
2518                 break;
2519         case -EKEYREJECTED:
2520                 task->tk_action = call_reserve;
2521                 rpc_check_timeout(task);
2522                 rpcauth_invalcred(task);
2523                 /* Ensure we obtain a new XID if we retry! */
2524                 xprt_release(task);
2525         }
2526 }
2527
2528 static int
2529 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2530 {
2531         struct rpc_clnt *clnt = task->tk_client;
2532         struct rpc_rqst *req = task->tk_rqstp;
2533         __be32 *p;
2534         int error;
2535
2536         error = -EMSGSIZE;
2537         p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2538         if (!p)
2539                 goto out_fail;
2540         *p++ = req->rq_xid;
2541         *p++ = rpc_call;
2542         *p++ = cpu_to_be32(RPC_VERSION);
2543         *p++ = cpu_to_be32(clnt->cl_prog);
2544         *p++ = cpu_to_be32(clnt->cl_vers);
2545         *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2546
2547         error = rpcauth_marshcred(task, xdr);
2548         if (error < 0)
2549                 goto out_fail;
2550         return 0;
2551 out_fail:
2552         trace_rpc_bad_callhdr(task);
2553         rpc_call_rpcerror(task, error);
2554         return error;
2555 }
2556
2557 static noinline int
2558 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2559 {
2560         struct rpc_clnt *clnt = task->tk_client;
2561         int error;
2562         __be32 *p;
2563
2564         /* RFC-1014 says that the representation of XDR data must be a
2565          * multiple of four bytes
2566          * - if it isn't pointer subtraction in the NFS client may give
2567          *   undefined results
2568          */
2569         if (task->tk_rqstp->rq_rcv_buf.len & 3)
2570                 goto out_unparsable;
2571
2572         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2573         if (!p)
2574                 goto out_unparsable;
2575         p++;    /* skip XID */
2576         if (*p++ != rpc_reply)
2577                 goto out_unparsable;
2578         if (*p++ != rpc_msg_accepted)
2579                 goto out_msg_denied;
2580
2581         error = rpcauth_checkverf(task, xdr);
2582         if (error)
2583                 goto out_verifier;
2584
2585         p = xdr_inline_decode(xdr, sizeof(*p));
2586         if (!p)
2587                 goto out_unparsable;
2588         switch (*p) {
2589         case rpc_success:
2590                 return 0;
2591         case rpc_prog_unavail:
2592                 trace_rpc__prog_unavail(task);
2593                 error = -EPFNOSUPPORT;
2594                 goto out_err;
2595         case rpc_prog_mismatch:
2596                 trace_rpc__prog_mismatch(task);
2597                 error = -EPROTONOSUPPORT;
2598                 goto out_err;
2599         case rpc_proc_unavail:
2600                 trace_rpc__proc_unavail(task);
2601                 error = -EOPNOTSUPP;
2602                 goto out_err;
2603         case rpc_garbage_args:
2604         case rpc_system_err:
2605                 trace_rpc__garbage_args(task);
2606                 error = -EIO;
2607                 break;
2608         default:
2609                 goto out_unparsable;
2610         }
2611
2612 out_garbage:
2613         clnt->cl_stats->rpcgarbage++;
2614         if (task->tk_garb_retry) {
2615                 task->tk_garb_retry--;
2616                 task->tk_action = call_encode;
2617                 return -EAGAIN;
2618         }
2619 out_err:
2620         rpc_call_rpcerror(task, error);
2621         return error;
2622
2623 out_unparsable:
2624         trace_rpc__unparsable(task);
2625         error = -EIO;
2626         goto out_garbage;
2627
2628 out_verifier:
2629         trace_rpc_bad_verifier(task);
2630         goto out_garbage;
2631
2632 out_msg_denied:
2633         error = -EACCES;
2634         p = xdr_inline_decode(xdr, sizeof(*p));
2635         if (!p)
2636                 goto out_unparsable;
2637         switch (*p++) {
2638         case rpc_auth_error:
2639                 break;
2640         case rpc_mismatch:
2641                 trace_rpc__mismatch(task);
2642                 error = -EPROTONOSUPPORT;
2643                 goto out_err;
2644         default:
2645                 goto out_unparsable;
2646         }
2647
2648         p = xdr_inline_decode(xdr, sizeof(*p));
2649         if (!p)
2650                 goto out_unparsable;
2651         switch (*p++) {
2652         case rpc_autherr_rejectedcred:
2653         case rpc_autherr_rejectedverf:
2654         case rpcsec_gsserr_credproblem:
2655         case rpcsec_gsserr_ctxproblem:
2656                 if (!task->tk_cred_retry)
2657                         break;
2658                 task->tk_cred_retry--;
2659                 trace_rpc__stale_creds(task);
2660                 return -EKEYREJECTED;
2661         case rpc_autherr_badcred:
2662         case rpc_autherr_badverf:
2663                 /* possibly garbled cred/verf? */
2664                 if (!task->tk_garb_retry)
2665                         break;
2666                 task->tk_garb_retry--;
2667                 trace_rpc__bad_creds(task);
2668                 task->tk_action = call_encode;
2669                 return -EAGAIN;
2670         case rpc_autherr_tooweak:
2671                 trace_rpc__auth_tooweak(task);
2672                 pr_warn("RPC: server %s requires stronger authentication.\n",
2673                         task->tk_xprt->servername);
2674                 break;
2675         default:
2676                 goto out_unparsable;
2677         }
2678         goto out_err;
2679 }
2680
2681 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2682                 const void *obj)
2683 {
2684 }
2685
2686 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2687                 void *obj)
2688 {
2689         return 0;
2690 }
2691
2692 static const struct rpc_procinfo rpcproc_null = {
2693         .p_encode = rpcproc_encode_null,
2694         .p_decode = rpcproc_decode_null,
2695 };
2696
2697 static int rpc_ping(struct rpc_clnt *clnt)
2698 {
2699         struct rpc_message msg = {
2700                 .rpc_proc = &rpcproc_null,
2701         };
2702         int err;
2703         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2704                             RPC_TASK_NULLCREDS);
2705         return err;
2706 }
2707
2708 static
2709 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2710                 struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2711                 const struct rpc_call_ops *ops, void *data)
2712 {
2713         struct rpc_message msg = {
2714                 .rpc_proc = &rpcproc_null,
2715         };
2716         struct rpc_task_setup task_setup_data = {
2717                 .rpc_client = clnt,
2718                 .rpc_xprt = xprt,
2719                 .rpc_message = &msg,
2720                 .rpc_op_cred = cred,
2721                 .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
2722                 .callback_data = data,
2723                 .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2724                          RPC_TASK_NULLCREDS,
2725         };
2726
2727         return rpc_run_task(&task_setup_data);
2728 }
2729
2730 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2731 {
2732         return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2733 }
2734 EXPORT_SYMBOL_GPL(rpc_call_null);
2735
2736 struct rpc_cb_add_xprt_calldata {
2737         struct rpc_xprt_switch *xps;
2738         struct rpc_xprt *xprt;
2739 };
2740
2741 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2742 {
2743         struct rpc_cb_add_xprt_calldata *data = calldata;
2744
2745         if (task->tk_status == 0)
2746                 rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2747 }
2748
2749 static void rpc_cb_add_xprt_release(void *calldata)
2750 {
2751         struct rpc_cb_add_xprt_calldata *data = calldata;
2752
2753         xprt_put(data->xprt);
2754         xprt_switch_put(data->xps);
2755         kfree(data);
2756 }
2757
2758 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2759         .rpc_call_done = rpc_cb_add_xprt_done,
2760         .rpc_release = rpc_cb_add_xprt_release,
2761 };
2762
2763 /**
2764  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2765  * @clnt: pointer to struct rpc_clnt
2766  * @xps: pointer to struct rpc_xprt_switch,
2767  * @xprt: pointer struct rpc_xprt
2768  * @dummy: unused
2769  */
2770 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2771                 struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2772                 void *dummy)
2773 {
2774         struct rpc_cb_add_xprt_calldata *data;
2775         struct rpc_task *task;
2776
2777         data = kmalloc(sizeof(*data), GFP_NOFS);
2778         if (!data)
2779                 return -ENOMEM;
2780         data->xps = xprt_switch_get(xps);
2781         data->xprt = xprt_get(xprt);
2782         if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2783                 rpc_cb_add_xprt_release(data);
2784                 goto success;
2785         }
2786
2787         task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2788                         &rpc_cb_add_xprt_call_ops, data);
2789
2790         rpc_put_task(task);
2791 success:
2792         return 1;
2793 }
2794 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2795
2796 /**
2797  * rpc_clnt_setup_test_and_add_xprt()
2798  *
2799  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2800  *   1) caller of the test function must dereference the rpc_xprt_switch
2801  *   and the rpc_xprt.
2802  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2803  *   the rpc_call_done routine.
2804  *
2805  * Upon success (return of 1), the test function adds the new
2806  * transport to the rpc_clnt xprt switch
2807  *
2808  * @clnt: struct rpc_clnt to get the new transport
2809  * @xps:  the rpc_xprt_switch to hold the new transport
2810  * @xprt: the rpc_xprt to test
2811  * @data: a struct rpc_add_xprt_test pointer that holds the test function
2812  *        and test function call data
2813  */
2814 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2815                                      struct rpc_xprt_switch *xps,
2816                                      struct rpc_xprt *xprt,
2817                                      void *data)
2818 {
2819         struct rpc_task *task;
2820         struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2821         int status = -EADDRINUSE;
2822
2823         xprt = xprt_get(xprt);
2824         xprt_switch_get(xps);
2825
2826         if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2827                 goto out_err;
2828
2829         /* Test the connection */
2830         task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2831         if (IS_ERR(task)) {
2832                 status = PTR_ERR(task);
2833                 goto out_err;
2834         }
2835         status = task->tk_status;
2836         rpc_put_task(task);
2837
2838         if (status < 0)
2839                 goto out_err;
2840
2841         /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2842         xtest->add_xprt_test(clnt, xprt, xtest->data);
2843
2844         xprt_put(xprt);
2845         xprt_switch_put(xps);
2846
2847         /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2848         return 1;
2849 out_err:
2850         xprt_put(xprt);
2851         xprt_switch_put(xps);
2852         pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2853                 status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2854         return status;
2855 }
2856 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2857
2858 /**
2859  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2860  * @clnt: pointer to struct rpc_clnt
2861  * @xprtargs: pointer to struct xprt_create
2862  * @setup: callback to test and/or set up the connection
2863  * @data: pointer to setup function data
2864  *
2865  * Creates a new transport using the parameters set in args and
2866  * adds it to clnt.
2867  * If ping is set, then test that connectivity succeeds before
2868  * adding the new transport.
2869  *
2870  */
2871 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2872                 struct xprt_create *xprtargs,
2873                 int (*setup)(struct rpc_clnt *,
2874                         struct rpc_xprt_switch *,
2875                         struct rpc_xprt *,
2876                         void *),
2877                 void *data)
2878 {
2879         struct rpc_xprt_switch *xps;
2880         struct rpc_xprt *xprt;
2881         unsigned long connect_timeout;
2882         unsigned long reconnect_timeout;
2883         unsigned char resvport, reuseport;
2884         int ret = 0;
2885
2886         rcu_read_lock();
2887         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2888         xprt = xprt_iter_xprt(&clnt->cl_xpi);
2889         if (xps == NULL || xprt == NULL) {
2890                 rcu_read_unlock();
2891                 xprt_switch_put(xps);
2892                 return -EAGAIN;
2893         }
2894         resvport = xprt->resvport;
2895         reuseport = xprt->reuseport;
2896         connect_timeout = xprt->connect_timeout;
2897         reconnect_timeout = xprt->max_reconnect_timeout;
2898         rcu_read_unlock();
2899
2900         xprt = xprt_create_transport(xprtargs);
2901         if (IS_ERR(xprt)) {
2902                 ret = PTR_ERR(xprt);
2903                 goto out_put_switch;
2904         }
2905         xprt->resvport = resvport;
2906         xprt->reuseport = reuseport;
2907         if (xprt->ops->set_connect_timeout != NULL)
2908                 xprt->ops->set_connect_timeout(xprt,
2909                                 connect_timeout,
2910                                 reconnect_timeout);
2911
2912         rpc_xprt_switch_set_roundrobin(xps);
2913         if (setup) {
2914                 ret = setup(clnt, xps, xprt, data);
2915                 if (ret != 0)
2916                         goto out_put_xprt;
2917         }
2918         rpc_xprt_switch_add_xprt(xps, xprt);
2919 out_put_xprt:
2920         xprt_put(xprt);
2921 out_put_switch:
2922         xprt_switch_put(xps);
2923         return ret;
2924 }
2925 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2926
2927 struct connect_timeout_data {
2928         unsigned long connect_timeout;
2929         unsigned long reconnect_timeout;
2930 };
2931
2932 static int
2933 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2934                 struct rpc_xprt *xprt,
2935                 void *data)
2936 {
2937         struct connect_timeout_data *timeo = data;
2938
2939         if (xprt->ops->set_connect_timeout)
2940                 xprt->ops->set_connect_timeout(xprt,
2941                                 timeo->connect_timeout,
2942                                 timeo->reconnect_timeout);
2943         return 0;
2944 }
2945
2946 void
2947 rpc_set_connect_timeout(struct rpc_clnt *clnt,
2948                 unsigned long connect_timeout,
2949                 unsigned long reconnect_timeout)
2950 {
2951         struct connect_timeout_data timeout = {
2952                 .connect_timeout = connect_timeout,
2953                 .reconnect_timeout = reconnect_timeout,
2954         };
2955         rpc_clnt_iterate_for_each_xprt(clnt,
2956                         rpc_xprt_set_connect_timeout,
2957                         &timeout);
2958 }
2959 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2960
2961 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2962 {
2963         rcu_read_lock();
2964         xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2965         rcu_read_unlock();
2966 }
2967 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
2968
2969 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
2970 {
2971         rcu_read_lock();
2972         rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
2973                                  xprt);
2974         rcu_read_unlock();
2975 }
2976 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
2977
2978 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
2979                                    const struct sockaddr *sap)
2980 {
2981         struct rpc_xprt_switch *xps;
2982         bool ret;
2983
2984         rcu_read_lock();
2985         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
2986         ret = rpc_xprt_switch_has_addr(xps, sap);
2987         rcu_read_unlock();
2988         return ret;
2989 }
2990 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
2991
2992 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2993 static void rpc_show_header(void)
2994 {
2995         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2996                 "-timeout ---ops--\n");
2997 }
2998
2999 static void rpc_show_task(const struct rpc_clnt *clnt,
3000                           const struct rpc_task *task)
3001 {
3002         const char *rpc_waitq = "none";
3003
3004         if (RPC_IS_QUEUED(task))
3005                 rpc_waitq = rpc_qname(task->tk_waitqueue);
3006
3007         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3008                 task->tk_pid, task->tk_flags, task->tk_status,
3009                 clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3010                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3011                 task->tk_action, rpc_waitq);
3012 }
3013
3014 void rpc_show_tasks(struct net *net)
3015 {
3016         struct rpc_clnt *clnt;
3017         struct rpc_task *task;
3018         int header = 0;
3019         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3020
3021         spin_lock(&sn->rpc_client_lock);
3022         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3023                 spin_lock(&clnt->cl_lock);
3024                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3025                         if (!header) {
3026                                 rpc_show_header();
3027                                 header++;
3028                         }
3029                         rpc_show_task(clnt, task);
3030                 }
3031                 spin_unlock(&clnt->cl_lock);
3032         }
3033         spin_unlock(&sn->rpc_client_lock);
3034 }
3035 #endif
3036
3037 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3038 static int
3039 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3040                 struct rpc_xprt *xprt,
3041                 void *dummy)
3042 {
3043         return xprt_enable_swap(xprt);
3044 }
3045
3046 int
3047 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3048 {
3049         if (atomic_inc_return(&clnt->cl_swapper) == 1)
3050                 return rpc_clnt_iterate_for_each_xprt(clnt,
3051                                 rpc_clnt_swap_activate_callback, NULL);
3052         return 0;
3053 }
3054 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3055
3056 static int
3057 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3058                 struct rpc_xprt *xprt,
3059                 void *dummy)
3060 {
3061         xprt_disable_swap(xprt);
3062         return 0;
3063 }
3064
3065 void
3066 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3067 {
3068         if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3069                 rpc_clnt_iterate_for_each_xprt(clnt,
3070                                 rpc_clnt_swap_deactivate_callback, NULL);
3071 }
3072 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3073 #endif /* CONFIG_SUNRPC_SWAP */