sunrpc: Create per-rpc_clnt sysfs kobjects
[linux-2.6-microblaze.git] / net / sunrpc / clnt.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  *  linux/net/sunrpc/clnt.c
4  *
5  *  This file contains the high-level RPC interface.
6  *  It is modeled as a finite state machine to support both synchronous
7  *  and asynchronous requests.
8  *
9  *  -   RPC header generation and argument serialization.
10  *  -   Credential refresh.
11  *  -   TCP connect handling.
12  *  -   Retry of operation when it is suspected the operation failed because
13  *      of uid squashing on the server, or when the credentials were stale
14  *      and need to be refreshed, or when a packet was damaged in transit.
15  *      This may be have to be moved to the VFS layer.
16  *
17  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19  */
20
21
22 #include <linux/module.h>
23 #include <linux/types.h>
24 #include <linux/kallsyms.h>
25 #include <linux/mm.h>
26 #include <linux/namei.h>
27 #include <linux/mount.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32 #include <linux/in.h>
33 #include <linux/in6.h>
34 #include <linux/un.h>
35
36 #include <linux/sunrpc/clnt.h>
37 #include <linux/sunrpc/addr.h>
38 #include <linux/sunrpc/rpc_pipe_fs.h>
39 #include <linux/sunrpc/metrics.h>
40 #include <linux/sunrpc/bc_xprt.h>
41 #include <trace/events/sunrpc.h>
42
43 #include "sunrpc.h"
44 #include "sysfs.h"
45 #include "netns.h"
46
47 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48 # define RPCDBG_FACILITY        RPCDBG_CALL
49 #endif
50
51 /*
52  * All RPC clients are linked into this list
53  */
54
55 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
56
57
58 static void     call_start(struct rpc_task *task);
59 static void     call_reserve(struct rpc_task *task);
60 static void     call_reserveresult(struct rpc_task *task);
61 static void     call_allocate(struct rpc_task *task);
62 static void     call_encode(struct rpc_task *task);
63 static void     call_decode(struct rpc_task *task);
64 static void     call_bind(struct rpc_task *task);
65 static void     call_bind_status(struct rpc_task *task);
66 static void     call_transmit(struct rpc_task *task);
67 static void     call_status(struct rpc_task *task);
68 static void     call_transmit_status(struct rpc_task *task);
69 static void     call_refresh(struct rpc_task *task);
70 static void     call_refreshresult(struct rpc_task *task);
71 static void     call_connect(struct rpc_task *task);
72 static void     call_connect_status(struct rpc_task *task);
73
74 static int      rpc_encode_header(struct rpc_task *task,
75                                   struct xdr_stream *xdr);
76 static int      rpc_decode_header(struct rpc_task *task,
77                                   struct xdr_stream *xdr);
78 static int      rpc_ping(struct rpc_clnt *clnt);
79 static void     rpc_check_timeout(struct rpc_task *task);
80
81 static void rpc_register_client(struct rpc_clnt *clnt)
82 {
83         struct net *net = rpc_net_ns(clnt);
84         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
85
86         spin_lock(&sn->rpc_client_lock);
87         list_add(&clnt->cl_clients, &sn->all_clients);
88         spin_unlock(&sn->rpc_client_lock);
89 }
90
91 static void rpc_unregister_client(struct rpc_clnt *clnt)
92 {
93         struct net *net = rpc_net_ns(clnt);
94         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
95
96         spin_lock(&sn->rpc_client_lock);
97         list_del(&clnt->cl_clients);
98         spin_unlock(&sn->rpc_client_lock);
99 }
100
101 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
102 {
103         rpc_remove_client_dir(clnt);
104 }
105
106 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
107 {
108         struct net *net = rpc_net_ns(clnt);
109         struct super_block *pipefs_sb;
110
111         pipefs_sb = rpc_get_sb_net(net);
112         if (pipefs_sb) {
113                 __rpc_clnt_remove_pipedir(clnt);
114                 rpc_put_sb_net(net);
115         }
116 }
117
118 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
119                                     struct rpc_clnt *clnt)
120 {
121         static uint32_t clntid;
122         const char *dir_name = clnt->cl_program->pipe_dir_name;
123         char name[15];
124         struct dentry *dir, *dentry;
125
126         dir = rpc_d_lookup_sb(sb, dir_name);
127         if (dir == NULL) {
128                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
129                 return dir;
130         }
131         for (;;) {
132                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
133                 name[sizeof(name) - 1] = '\0';
134                 dentry = rpc_create_client_dir(dir, name, clnt);
135                 if (!IS_ERR(dentry))
136                         break;
137                 if (dentry == ERR_PTR(-EEXIST))
138                         continue;
139                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
140                                 " %s/%s, error %ld\n",
141                                 dir_name, name, PTR_ERR(dentry));
142                 break;
143         }
144         dput(dir);
145         return dentry;
146 }
147
148 static int
149 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
150 {
151         struct dentry *dentry;
152
153         if (clnt->cl_program->pipe_dir_name != NULL) {
154                 dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
155                 if (IS_ERR(dentry))
156                         return PTR_ERR(dentry);
157         }
158         return 0;
159 }
160
161 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
162 {
163         if (clnt->cl_program->pipe_dir_name == NULL)
164                 return 1;
165
166         switch (event) {
167         case RPC_PIPEFS_MOUNT:
168                 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
169                         return 1;
170                 if (atomic_read(&clnt->cl_count) == 0)
171                         return 1;
172                 break;
173         case RPC_PIPEFS_UMOUNT:
174                 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
175                         return 1;
176                 break;
177         }
178         return 0;
179 }
180
181 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
182                                    struct super_block *sb)
183 {
184         struct dentry *dentry;
185
186         switch (event) {
187         case RPC_PIPEFS_MOUNT:
188                 dentry = rpc_setup_pipedir_sb(sb, clnt);
189                 if (!dentry)
190                         return -ENOENT;
191                 if (IS_ERR(dentry))
192                         return PTR_ERR(dentry);
193                 break;
194         case RPC_PIPEFS_UMOUNT:
195                 __rpc_clnt_remove_pipedir(clnt);
196                 break;
197         default:
198                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
199                 return -ENOTSUPP;
200         }
201         return 0;
202 }
203
204 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
205                                 struct super_block *sb)
206 {
207         int error = 0;
208
209         for (;; clnt = clnt->cl_parent) {
210                 if (!rpc_clnt_skip_event(clnt, event))
211                         error = __rpc_clnt_handle_event(clnt, event, sb);
212                 if (error || clnt == clnt->cl_parent)
213                         break;
214         }
215         return error;
216 }
217
218 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
219 {
220         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
221         struct rpc_clnt *clnt;
222
223         spin_lock(&sn->rpc_client_lock);
224         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
225                 if (rpc_clnt_skip_event(clnt, event))
226                         continue;
227                 spin_unlock(&sn->rpc_client_lock);
228                 return clnt;
229         }
230         spin_unlock(&sn->rpc_client_lock);
231         return NULL;
232 }
233
234 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
235                             void *ptr)
236 {
237         struct super_block *sb = ptr;
238         struct rpc_clnt *clnt;
239         int error = 0;
240
241         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
242                 error = __rpc_pipefs_event(clnt, event, sb);
243                 if (error)
244                         break;
245         }
246         return error;
247 }
248
249 static struct notifier_block rpc_clients_block = {
250         .notifier_call  = rpc_pipefs_event,
251         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
252 };
253
254 int rpc_clients_notifier_register(void)
255 {
256         return rpc_pipefs_notifier_register(&rpc_clients_block);
257 }
258
259 void rpc_clients_notifier_unregister(void)
260 {
261         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
262 }
263
264 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
265                 struct rpc_xprt *xprt,
266                 const struct rpc_timeout *timeout)
267 {
268         struct rpc_xprt *old;
269
270         spin_lock(&clnt->cl_lock);
271         old = rcu_dereference_protected(clnt->cl_xprt,
272                         lockdep_is_held(&clnt->cl_lock));
273
274         if (!xprt_bound(xprt))
275                 clnt->cl_autobind = 1;
276
277         clnt->cl_timeout = timeout;
278         rcu_assign_pointer(clnt->cl_xprt, xprt);
279         spin_unlock(&clnt->cl_lock);
280
281         return old;
282 }
283
284 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
285 {
286         clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
287                         nodename, sizeof(clnt->cl_nodename));
288 }
289
290 static int rpc_client_register(struct rpc_clnt *clnt,
291                                rpc_authflavor_t pseudoflavor,
292                                const char *client_name)
293 {
294         struct rpc_auth_create_args auth_args = {
295                 .pseudoflavor = pseudoflavor,
296                 .target_name = client_name,
297         };
298         struct rpc_auth *auth;
299         struct net *net = rpc_net_ns(clnt);
300         struct super_block *pipefs_sb;
301         int err;
302
303         rpc_clnt_debugfs_register(clnt);
304         rpc_sysfs_client_setup(clnt, net);
305
306         pipefs_sb = rpc_get_sb_net(net);
307         if (pipefs_sb) {
308                 err = rpc_setup_pipedir(pipefs_sb, clnt);
309                 if (err)
310                         goto out;
311         }
312
313         rpc_register_client(clnt);
314         if (pipefs_sb)
315                 rpc_put_sb_net(net);
316
317         auth = rpcauth_create(&auth_args, clnt);
318         if (IS_ERR(auth)) {
319                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
320                                 pseudoflavor);
321                 err = PTR_ERR(auth);
322                 goto err_auth;
323         }
324         return 0;
325 err_auth:
326         pipefs_sb = rpc_get_sb_net(net);
327         rpc_unregister_client(clnt);
328         __rpc_clnt_remove_pipedir(clnt);
329 out:
330         if (pipefs_sb)
331                 rpc_put_sb_net(net);
332         rpc_sysfs_client_destroy(clnt);
333         rpc_clnt_debugfs_unregister(clnt);
334         return err;
335 }
336
337 static DEFINE_IDA(rpc_clids);
338
339 void rpc_cleanup_clids(void)
340 {
341         ida_destroy(&rpc_clids);
342 }
343
344 static int rpc_alloc_clid(struct rpc_clnt *clnt)
345 {
346         int clid;
347
348         clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
349         if (clid < 0)
350                 return clid;
351         clnt->cl_clid = clid;
352         return 0;
353 }
354
355 static void rpc_free_clid(struct rpc_clnt *clnt)
356 {
357         ida_simple_remove(&rpc_clids, clnt->cl_clid);
358 }
359
360 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
361                 struct rpc_xprt_switch *xps,
362                 struct rpc_xprt *xprt,
363                 struct rpc_clnt *parent)
364 {
365         const struct rpc_program *program = args->program;
366         const struct rpc_version *version;
367         struct rpc_clnt *clnt = NULL;
368         const struct rpc_timeout *timeout;
369         const char *nodename = args->nodename;
370         int err;
371
372         err = rpciod_up();
373         if (err)
374                 goto out_no_rpciod;
375
376         err = -EINVAL;
377         if (args->version >= program->nrvers)
378                 goto out_err;
379         version = program->version[args->version];
380         if (version == NULL)
381                 goto out_err;
382
383         err = -ENOMEM;
384         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
385         if (!clnt)
386                 goto out_err;
387         clnt->cl_parent = parent ? : clnt;
388
389         err = rpc_alloc_clid(clnt);
390         if (err)
391                 goto out_no_clid;
392
393         clnt->cl_cred     = get_cred(args->cred);
394         clnt->cl_procinfo = version->procs;
395         clnt->cl_maxproc  = version->nrprocs;
396         clnt->cl_prog     = args->prognumber ? : program->number;
397         clnt->cl_vers     = version->number;
398         clnt->cl_stats    = program->stats;
399         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
400         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
401         err = -ENOMEM;
402         if (clnt->cl_metrics == NULL)
403                 goto out_no_stats;
404         clnt->cl_program  = program;
405         INIT_LIST_HEAD(&clnt->cl_tasks);
406         spin_lock_init(&clnt->cl_lock);
407
408         timeout = xprt->timeout;
409         if (args->timeout != NULL) {
410                 memcpy(&clnt->cl_timeout_default, args->timeout,
411                                 sizeof(clnt->cl_timeout_default));
412                 timeout = &clnt->cl_timeout_default;
413         }
414
415         rpc_clnt_set_transport(clnt, xprt, timeout);
416         xprt_iter_init(&clnt->cl_xpi, xps);
417         xprt_switch_put(xps);
418
419         clnt->cl_rtt = &clnt->cl_rtt_default;
420         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
421
422         atomic_set(&clnt->cl_count, 1);
423
424         if (nodename == NULL)
425                 nodename = utsname()->nodename;
426         /* save the nodename */
427         rpc_clnt_set_nodename(clnt, nodename);
428
429         err = rpc_client_register(clnt, args->authflavor, args->client_name);
430         if (err)
431                 goto out_no_path;
432         if (parent)
433                 atomic_inc(&parent->cl_count);
434
435         trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
436         return clnt;
437
438 out_no_path:
439         rpc_free_iostats(clnt->cl_metrics);
440 out_no_stats:
441         put_cred(clnt->cl_cred);
442         rpc_free_clid(clnt);
443 out_no_clid:
444         kfree(clnt);
445 out_err:
446         rpciod_down();
447 out_no_rpciod:
448         xprt_switch_put(xps);
449         xprt_put(xprt);
450         trace_rpc_clnt_new_err(program->name, args->servername, err);
451         return ERR_PTR(err);
452 }
453
454 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
455                                         struct rpc_xprt *xprt)
456 {
457         struct rpc_clnt *clnt = NULL;
458         struct rpc_xprt_switch *xps;
459
460         if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
461                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
462                 xps = args->bc_xprt->xpt_bc_xps;
463                 xprt_switch_get(xps);
464         } else {
465                 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
466                 if (xps == NULL) {
467                         xprt_put(xprt);
468                         return ERR_PTR(-ENOMEM);
469                 }
470                 if (xprt->bc_xprt) {
471                         xprt_switch_get(xps);
472                         xprt->bc_xprt->xpt_bc_xps = xps;
473                 }
474         }
475         clnt = rpc_new_client(args, xps, xprt, NULL);
476         if (IS_ERR(clnt))
477                 return clnt;
478
479         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
480                 int err = rpc_ping(clnt);
481                 if (err != 0) {
482                         rpc_shutdown_client(clnt);
483                         return ERR_PTR(err);
484                 }
485         }
486
487         clnt->cl_softrtry = 1;
488         if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
489                 clnt->cl_softrtry = 0;
490                 if (args->flags & RPC_CLNT_CREATE_SOFTERR)
491                         clnt->cl_softerr = 1;
492         }
493
494         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
495                 clnt->cl_autobind = 1;
496         if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
497                 clnt->cl_noretranstimeo = 1;
498         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
499                 clnt->cl_discrtry = 1;
500         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
501                 clnt->cl_chatty = 1;
502
503         return clnt;
504 }
505
506 /**
507  * rpc_create - create an RPC client and transport with one call
508  * @args: rpc_clnt create argument structure
509  *
510  * Creates and initializes an RPC transport and an RPC client.
511  *
512  * It can ping the server in order to determine if it is up, and to see if
513  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
514  * this behavior so asynchronous tasks can also use rpc_create.
515  */
516 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
517 {
518         struct rpc_xprt *xprt;
519         struct xprt_create xprtargs = {
520                 .net = args->net,
521                 .ident = args->protocol,
522                 .srcaddr = args->saddress,
523                 .dstaddr = args->address,
524                 .addrlen = args->addrsize,
525                 .servername = args->servername,
526                 .bc_xprt = args->bc_xprt,
527         };
528         char servername[48];
529         struct rpc_clnt *clnt;
530         int i;
531
532         if (args->bc_xprt) {
533                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
534                 xprt = args->bc_xprt->xpt_bc_xprt;
535                 if (xprt) {
536                         xprt_get(xprt);
537                         return rpc_create_xprt(args, xprt);
538                 }
539         }
540
541         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
542                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
543         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
544                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
545         /*
546          * If the caller chooses not to specify a hostname, whip
547          * up a string representation of the passed-in address.
548          */
549         if (xprtargs.servername == NULL) {
550                 struct sockaddr_un *sun =
551                                 (struct sockaddr_un *)args->address;
552                 struct sockaddr_in *sin =
553                                 (struct sockaddr_in *)args->address;
554                 struct sockaddr_in6 *sin6 =
555                                 (struct sockaddr_in6 *)args->address;
556
557                 servername[0] = '\0';
558                 switch (args->address->sa_family) {
559                 case AF_LOCAL:
560                         snprintf(servername, sizeof(servername), "%s",
561                                  sun->sun_path);
562                         break;
563                 case AF_INET:
564                         snprintf(servername, sizeof(servername), "%pI4",
565                                  &sin->sin_addr.s_addr);
566                         break;
567                 case AF_INET6:
568                         snprintf(servername, sizeof(servername), "%pI6",
569                                  &sin6->sin6_addr);
570                         break;
571                 default:
572                         /* caller wants default server name, but
573                          * address family isn't recognized. */
574                         return ERR_PTR(-EINVAL);
575                 }
576                 xprtargs.servername = servername;
577         }
578
579         xprt = xprt_create_transport(&xprtargs);
580         if (IS_ERR(xprt))
581                 return (struct rpc_clnt *)xprt;
582
583         /*
584          * By default, kernel RPC client connects from a reserved port.
585          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
586          * but it is always enabled for rpciod, which handles the connect
587          * operation.
588          */
589         xprt->resvport = 1;
590         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
591                 xprt->resvport = 0;
592         xprt->reuseport = 0;
593         if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
594                 xprt->reuseport = 1;
595
596         clnt = rpc_create_xprt(args, xprt);
597         if (IS_ERR(clnt) || args->nconnect <= 1)
598                 return clnt;
599
600         for (i = 0; i < args->nconnect - 1; i++) {
601                 if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
602                         break;
603         }
604         return clnt;
605 }
606 EXPORT_SYMBOL_GPL(rpc_create);
607
608 /*
609  * This function clones the RPC client structure. It allows us to share the
610  * same transport while varying parameters such as the authentication
611  * flavour.
612  */
613 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
614                                            struct rpc_clnt *clnt)
615 {
616         struct rpc_xprt_switch *xps;
617         struct rpc_xprt *xprt;
618         struct rpc_clnt *new;
619         int err;
620
621         err = -ENOMEM;
622         rcu_read_lock();
623         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
624         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
625         rcu_read_unlock();
626         if (xprt == NULL || xps == NULL) {
627                 xprt_put(xprt);
628                 xprt_switch_put(xps);
629                 goto out_err;
630         }
631         args->servername = xprt->servername;
632         args->nodename = clnt->cl_nodename;
633
634         new = rpc_new_client(args, xps, xprt, clnt);
635         if (IS_ERR(new))
636                 return new;
637
638         /* Turn off autobind on clones */
639         new->cl_autobind = 0;
640         new->cl_softrtry = clnt->cl_softrtry;
641         new->cl_softerr = clnt->cl_softerr;
642         new->cl_noretranstimeo = clnt->cl_noretranstimeo;
643         new->cl_discrtry = clnt->cl_discrtry;
644         new->cl_chatty = clnt->cl_chatty;
645         new->cl_principal = clnt->cl_principal;
646         return new;
647
648 out_err:
649         trace_rpc_clnt_clone_err(clnt, err);
650         return ERR_PTR(err);
651 }
652
653 /**
654  * rpc_clone_client - Clone an RPC client structure
655  *
656  * @clnt: RPC client whose parameters are copied
657  *
658  * Returns a fresh RPC client or an ERR_PTR.
659  */
660 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
661 {
662         struct rpc_create_args args = {
663                 .program        = clnt->cl_program,
664                 .prognumber     = clnt->cl_prog,
665                 .version        = clnt->cl_vers,
666                 .authflavor     = clnt->cl_auth->au_flavor,
667                 .cred           = clnt->cl_cred,
668         };
669         return __rpc_clone_client(&args, clnt);
670 }
671 EXPORT_SYMBOL_GPL(rpc_clone_client);
672
673 /**
674  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
675  *
676  * @clnt: RPC client whose parameters are copied
677  * @flavor: security flavor for new client
678  *
679  * Returns a fresh RPC client or an ERR_PTR.
680  */
681 struct rpc_clnt *
682 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
683 {
684         struct rpc_create_args args = {
685                 .program        = clnt->cl_program,
686                 .prognumber     = clnt->cl_prog,
687                 .version        = clnt->cl_vers,
688                 .authflavor     = flavor,
689                 .cred           = clnt->cl_cred,
690         };
691         return __rpc_clone_client(&args, clnt);
692 }
693 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
694
695 /**
696  * rpc_switch_client_transport: switch the RPC transport on the fly
697  * @clnt: pointer to a struct rpc_clnt
698  * @args: pointer to the new transport arguments
699  * @timeout: pointer to the new timeout parameters
700  *
701  * This function allows the caller to switch the RPC transport for the
702  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
703  * server, for instance.  It assumes that the caller has ensured that
704  * there are no active RPC tasks by using some form of locking.
705  *
706  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
707  * negative errno is returned, and "clnt" continues to use the old
708  * xprt.
709  */
710 int rpc_switch_client_transport(struct rpc_clnt *clnt,
711                 struct xprt_create *args,
712                 const struct rpc_timeout *timeout)
713 {
714         const struct rpc_timeout *old_timeo;
715         rpc_authflavor_t pseudoflavor;
716         struct rpc_xprt_switch *xps, *oldxps;
717         struct rpc_xprt *xprt, *old;
718         struct rpc_clnt *parent;
719         int err;
720
721         xprt = xprt_create_transport(args);
722         if (IS_ERR(xprt))
723                 return PTR_ERR(xprt);
724
725         xps = xprt_switch_alloc(xprt, GFP_KERNEL);
726         if (xps == NULL) {
727                 xprt_put(xprt);
728                 return -ENOMEM;
729         }
730
731         pseudoflavor = clnt->cl_auth->au_flavor;
732
733         old_timeo = clnt->cl_timeout;
734         old = rpc_clnt_set_transport(clnt, xprt, timeout);
735         oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
736
737         rpc_unregister_client(clnt);
738         __rpc_clnt_remove_pipedir(clnt);
739         rpc_sysfs_client_destroy(clnt);
740         rpc_clnt_debugfs_unregister(clnt);
741
742         /*
743          * A new transport was created.  "clnt" therefore
744          * becomes the root of a new cl_parent tree.  clnt's
745          * children, if it has any, still point to the old xprt.
746          */
747         parent = clnt->cl_parent;
748         clnt->cl_parent = clnt;
749
750         /*
751          * The old rpc_auth cache cannot be re-used.  GSS
752          * contexts in particular are between a single
753          * client and server.
754          */
755         err = rpc_client_register(clnt, pseudoflavor, NULL);
756         if (err)
757                 goto out_revert;
758
759         synchronize_rcu();
760         if (parent != clnt)
761                 rpc_release_client(parent);
762         xprt_switch_put(oldxps);
763         xprt_put(old);
764         trace_rpc_clnt_replace_xprt(clnt);
765         return 0;
766
767 out_revert:
768         xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
769         rpc_clnt_set_transport(clnt, old, old_timeo);
770         clnt->cl_parent = parent;
771         rpc_client_register(clnt, pseudoflavor, NULL);
772         xprt_switch_put(xps);
773         xprt_put(xprt);
774         trace_rpc_clnt_replace_xprt_err(clnt);
775         return err;
776 }
777 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
778
779 static
780 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
781 {
782         struct rpc_xprt_switch *xps;
783
784         rcu_read_lock();
785         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
786         rcu_read_unlock();
787         if (xps == NULL)
788                 return -EAGAIN;
789         xprt_iter_init_listall(xpi, xps);
790         xprt_switch_put(xps);
791         return 0;
792 }
793
794 /**
795  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
796  * @clnt: pointer to client
797  * @fn: function to apply
798  * @data: void pointer to function data
799  *
800  * Iterates through the list of RPC transports currently attached to the
801  * client and applies the function fn(clnt, xprt, data).
802  *
803  * On error, the iteration stops, and the function returns the error value.
804  */
805 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
806                 int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
807                 void *data)
808 {
809         struct rpc_xprt_iter xpi;
810         int ret;
811
812         ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
813         if (ret)
814                 return ret;
815         for (;;) {
816                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
817
818                 if (!xprt)
819                         break;
820                 ret = fn(clnt, xprt, data);
821                 xprt_put(xprt);
822                 if (ret < 0)
823                         break;
824         }
825         xprt_iter_destroy(&xpi);
826         return ret;
827 }
828 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
829
830 /*
831  * Kill all tasks for the given client.
832  * XXX: kill their descendants as well?
833  */
834 void rpc_killall_tasks(struct rpc_clnt *clnt)
835 {
836         struct rpc_task *rovr;
837
838
839         if (list_empty(&clnt->cl_tasks))
840                 return;
841
842         /*
843          * Spin lock all_tasks to prevent changes...
844          */
845         trace_rpc_clnt_killall(clnt);
846         spin_lock(&clnt->cl_lock);
847         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
848                 rpc_signal_task(rovr);
849         spin_unlock(&clnt->cl_lock);
850 }
851 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
852
853 /*
854  * Properly shut down an RPC client, terminating all outstanding
855  * requests.
856  */
857 void rpc_shutdown_client(struct rpc_clnt *clnt)
858 {
859         might_sleep();
860
861         trace_rpc_clnt_shutdown(clnt);
862
863         while (!list_empty(&clnt->cl_tasks)) {
864                 rpc_killall_tasks(clnt);
865                 wait_event_timeout(destroy_wait,
866                         list_empty(&clnt->cl_tasks), 1*HZ);
867         }
868
869         rpc_release_client(clnt);
870 }
871 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
872
873 /*
874  * Free an RPC client
875  */
876 static void rpc_free_client_work(struct work_struct *work)
877 {
878         struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
879
880         trace_rpc_clnt_free(clnt);
881
882         /* These might block on processes that might allocate memory,
883          * so they cannot be called in rpciod, so they are handled separately
884          * here.
885          */
886         rpc_sysfs_client_destroy(clnt);
887         rpc_clnt_debugfs_unregister(clnt);
888         rpc_free_clid(clnt);
889         rpc_clnt_remove_pipedir(clnt);
890         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
891
892         kfree(clnt);
893         rpciod_down();
894 }
895 static struct rpc_clnt *
896 rpc_free_client(struct rpc_clnt *clnt)
897 {
898         struct rpc_clnt *parent = NULL;
899
900         trace_rpc_clnt_release(clnt);
901         if (clnt->cl_parent != clnt)
902                 parent = clnt->cl_parent;
903         rpc_unregister_client(clnt);
904         rpc_free_iostats(clnt->cl_metrics);
905         clnt->cl_metrics = NULL;
906         xprt_iter_destroy(&clnt->cl_xpi);
907         put_cred(clnt->cl_cred);
908
909         INIT_WORK(&clnt->cl_work, rpc_free_client_work);
910         schedule_work(&clnt->cl_work);
911         return parent;
912 }
913
914 /*
915  * Free an RPC client
916  */
917 static struct rpc_clnt *
918 rpc_free_auth(struct rpc_clnt *clnt)
919 {
920         if (clnt->cl_auth == NULL)
921                 return rpc_free_client(clnt);
922
923         /*
924          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
925          *       release remaining GSS contexts. This mechanism ensures
926          *       that it can do so safely.
927          */
928         atomic_inc(&clnt->cl_count);
929         rpcauth_release(clnt->cl_auth);
930         clnt->cl_auth = NULL;
931         if (atomic_dec_and_test(&clnt->cl_count))
932                 return rpc_free_client(clnt);
933         return NULL;
934 }
935
936 /*
937  * Release reference to the RPC client
938  */
939 void
940 rpc_release_client(struct rpc_clnt *clnt)
941 {
942         do {
943                 if (list_empty(&clnt->cl_tasks))
944                         wake_up(&destroy_wait);
945                 if (!atomic_dec_and_test(&clnt->cl_count))
946                         break;
947                 clnt = rpc_free_auth(clnt);
948         } while (clnt != NULL);
949 }
950 EXPORT_SYMBOL_GPL(rpc_release_client);
951
952 /**
953  * rpc_bind_new_program - bind a new RPC program to an existing client
954  * @old: old rpc_client
955  * @program: rpc program to set
956  * @vers: rpc program version
957  *
958  * Clones the rpc client and sets up a new RPC program. This is mainly
959  * of use for enabling different RPC programs to share the same transport.
960  * The Sun NFSv2/v3 ACL protocol can do this.
961  */
962 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
963                                       const struct rpc_program *program,
964                                       u32 vers)
965 {
966         struct rpc_create_args args = {
967                 .program        = program,
968                 .prognumber     = program->number,
969                 .version        = vers,
970                 .authflavor     = old->cl_auth->au_flavor,
971                 .cred           = old->cl_cred,
972         };
973         struct rpc_clnt *clnt;
974         int err;
975
976         clnt = __rpc_clone_client(&args, old);
977         if (IS_ERR(clnt))
978                 goto out;
979         err = rpc_ping(clnt);
980         if (err != 0) {
981                 rpc_shutdown_client(clnt);
982                 clnt = ERR_PTR(err);
983         }
984 out:
985         return clnt;
986 }
987 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
988
989 struct rpc_xprt *
990 rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
991 {
992         struct rpc_xprt_switch *xps;
993
994         if (!xprt)
995                 return NULL;
996         rcu_read_lock();
997         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
998         atomic_long_inc(&xps->xps_queuelen);
999         rcu_read_unlock();
1000         atomic_long_inc(&xprt->queuelen);
1001
1002         return xprt;
1003 }
1004
1005 static void
1006 rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1007 {
1008         struct rpc_xprt_switch *xps;
1009
1010         atomic_long_dec(&xprt->queuelen);
1011         rcu_read_lock();
1012         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1013         atomic_long_dec(&xps->xps_queuelen);
1014         rcu_read_unlock();
1015
1016         xprt_put(xprt);
1017 }
1018
1019 void rpc_task_release_transport(struct rpc_task *task)
1020 {
1021         struct rpc_xprt *xprt = task->tk_xprt;
1022
1023         if (xprt) {
1024                 task->tk_xprt = NULL;
1025                 if (task->tk_client)
1026                         rpc_task_release_xprt(task->tk_client, xprt);
1027                 else
1028                         xprt_put(xprt);
1029         }
1030 }
1031 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1032
1033 void rpc_task_release_client(struct rpc_task *task)
1034 {
1035         struct rpc_clnt *clnt = task->tk_client;
1036
1037         rpc_task_release_transport(task);
1038         if (clnt != NULL) {
1039                 /* Remove from client task list */
1040                 spin_lock(&clnt->cl_lock);
1041                 list_del(&task->tk_task);
1042                 spin_unlock(&clnt->cl_lock);
1043                 task->tk_client = NULL;
1044
1045                 rpc_release_client(clnt);
1046         }
1047 }
1048
1049 static struct rpc_xprt *
1050 rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1051 {
1052         struct rpc_xprt *xprt;
1053
1054         rcu_read_lock();
1055         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1056         rcu_read_unlock();
1057         return rpc_task_get_xprt(clnt, xprt);
1058 }
1059
1060 static struct rpc_xprt *
1061 rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1062 {
1063         return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1064 }
1065
1066 static
1067 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1068 {
1069         if (task->tk_xprt)
1070                 return;
1071         if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1072                 task->tk_xprt = rpc_task_get_first_xprt(clnt);
1073         else
1074                 task->tk_xprt = rpc_task_get_next_xprt(clnt);
1075 }
1076
1077 static
1078 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1079 {
1080
1081         if (clnt != NULL) {
1082                 rpc_task_set_transport(task, clnt);
1083                 task->tk_client = clnt;
1084                 atomic_inc(&clnt->cl_count);
1085                 if (clnt->cl_softrtry)
1086                         task->tk_flags |= RPC_TASK_SOFT;
1087                 if (clnt->cl_softerr)
1088                         task->tk_flags |= RPC_TASK_TIMEOUT;
1089                 if (clnt->cl_noretranstimeo)
1090                         task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1091                 if (atomic_read(&clnt->cl_swapper))
1092                         task->tk_flags |= RPC_TASK_SWAPPER;
1093                 /* Add to the client's list of all tasks */
1094                 spin_lock(&clnt->cl_lock);
1095                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
1096                 spin_unlock(&clnt->cl_lock);
1097         }
1098 }
1099
1100 static void
1101 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1102 {
1103         if (msg != NULL) {
1104                 task->tk_msg.rpc_proc = msg->rpc_proc;
1105                 task->tk_msg.rpc_argp = msg->rpc_argp;
1106                 task->tk_msg.rpc_resp = msg->rpc_resp;
1107                 task->tk_msg.rpc_cred = msg->rpc_cred;
1108                 if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1109                         get_cred(task->tk_msg.rpc_cred);
1110         }
1111 }
1112
1113 /*
1114  * Default callback for async RPC calls
1115  */
1116 static void
1117 rpc_default_callback(struct rpc_task *task, void *data)
1118 {
1119 }
1120
1121 static const struct rpc_call_ops rpc_default_ops = {
1122         .rpc_call_done = rpc_default_callback,
1123 };
1124
1125 /**
1126  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1127  * @task_setup_data: pointer to task initialisation data
1128  */
1129 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1130 {
1131         struct rpc_task *task;
1132
1133         task = rpc_new_task(task_setup_data);
1134
1135         if (!RPC_IS_ASYNC(task))
1136                 task->tk_flags |= RPC_TASK_CRED_NOREF;
1137
1138         rpc_task_set_client(task, task_setup_data->rpc_client);
1139         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1140
1141         if (task->tk_action == NULL)
1142                 rpc_call_start(task);
1143
1144         atomic_inc(&task->tk_count);
1145         rpc_execute(task);
1146         return task;
1147 }
1148 EXPORT_SYMBOL_GPL(rpc_run_task);
1149
1150 /**
1151  * rpc_call_sync - Perform a synchronous RPC call
1152  * @clnt: pointer to RPC client
1153  * @msg: RPC call parameters
1154  * @flags: RPC call flags
1155  */
1156 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1157 {
1158         struct rpc_task *task;
1159         struct rpc_task_setup task_setup_data = {
1160                 .rpc_client = clnt,
1161                 .rpc_message = msg,
1162                 .callback_ops = &rpc_default_ops,
1163                 .flags = flags,
1164         };
1165         int status;
1166
1167         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1168         if (flags & RPC_TASK_ASYNC) {
1169                 rpc_release_calldata(task_setup_data.callback_ops,
1170                         task_setup_data.callback_data);
1171                 return -EINVAL;
1172         }
1173
1174         task = rpc_run_task(&task_setup_data);
1175         if (IS_ERR(task))
1176                 return PTR_ERR(task);
1177         status = task->tk_status;
1178         rpc_put_task(task);
1179         return status;
1180 }
1181 EXPORT_SYMBOL_GPL(rpc_call_sync);
1182
1183 /**
1184  * rpc_call_async - Perform an asynchronous RPC call
1185  * @clnt: pointer to RPC client
1186  * @msg: RPC call parameters
1187  * @flags: RPC call flags
1188  * @tk_ops: RPC call ops
1189  * @data: user call data
1190  */
1191 int
1192 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1193                const struct rpc_call_ops *tk_ops, void *data)
1194 {
1195         struct rpc_task *task;
1196         struct rpc_task_setup task_setup_data = {
1197                 .rpc_client = clnt,
1198                 .rpc_message = msg,
1199                 .callback_ops = tk_ops,
1200                 .callback_data = data,
1201                 .flags = flags|RPC_TASK_ASYNC,
1202         };
1203
1204         task = rpc_run_task(&task_setup_data);
1205         if (IS_ERR(task))
1206                 return PTR_ERR(task);
1207         rpc_put_task(task);
1208         return 0;
1209 }
1210 EXPORT_SYMBOL_GPL(rpc_call_async);
1211
1212 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1213 static void call_bc_encode(struct rpc_task *task);
1214
1215 /**
1216  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1217  * rpc_execute against it
1218  * @req: RPC request
1219  */
1220 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1221 {
1222         struct rpc_task *task;
1223         struct rpc_task_setup task_setup_data = {
1224                 .callback_ops = &rpc_default_ops,
1225                 .flags = RPC_TASK_SOFTCONN |
1226                         RPC_TASK_NO_RETRANS_TIMEOUT,
1227         };
1228
1229         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1230         /*
1231          * Create an rpc_task to send the data
1232          */
1233         task = rpc_new_task(&task_setup_data);
1234         xprt_init_bc_request(req, task);
1235
1236         task->tk_action = call_bc_encode;
1237         atomic_inc(&task->tk_count);
1238         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1239         rpc_execute(task);
1240
1241         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1242         return task;
1243 }
1244 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1245
1246 /**
1247  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1248  * @req: RPC request to prepare
1249  * @pages: vector of struct page pointers
1250  * @base: offset in first page where receive should start, in bytes
1251  * @len: expected size of the upper layer data payload, in bytes
1252  * @hdrsize: expected size of upper layer reply header, in XDR words
1253  *
1254  */
1255 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1256                              unsigned int base, unsigned int len,
1257                              unsigned int hdrsize)
1258 {
1259         hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1260
1261         xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1262         trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1263 }
1264 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1265
1266 void
1267 rpc_call_start(struct rpc_task *task)
1268 {
1269         task->tk_action = call_start;
1270 }
1271 EXPORT_SYMBOL_GPL(rpc_call_start);
1272
1273 /**
1274  * rpc_peeraddr - extract remote peer address from clnt's xprt
1275  * @clnt: RPC client structure
1276  * @buf: target buffer
1277  * @bufsize: length of target buffer
1278  *
1279  * Returns the number of bytes that are actually in the stored address.
1280  */
1281 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1282 {
1283         size_t bytes;
1284         struct rpc_xprt *xprt;
1285
1286         rcu_read_lock();
1287         xprt = rcu_dereference(clnt->cl_xprt);
1288
1289         bytes = xprt->addrlen;
1290         if (bytes > bufsize)
1291                 bytes = bufsize;
1292         memcpy(buf, &xprt->addr, bytes);
1293         rcu_read_unlock();
1294
1295         return bytes;
1296 }
1297 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1298
1299 /**
1300  * rpc_peeraddr2str - return remote peer address in printable format
1301  * @clnt: RPC client structure
1302  * @format: address format
1303  *
1304  * NB: the lifetime of the memory referenced by the returned pointer is
1305  * the same as the rpc_xprt itself.  As long as the caller uses this
1306  * pointer, it must hold the RCU read lock.
1307  */
1308 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1309                              enum rpc_display_format_t format)
1310 {
1311         struct rpc_xprt *xprt;
1312
1313         xprt = rcu_dereference(clnt->cl_xprt);
1314
1315         if (xprt->address_strings[format] != NULL)
1316                 return xprt->address_strings[format];
1317         else
1318                 return "unprintable";
1319 }
1320 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1321
1322 static const struct sockaddr_in rpc_inaddr_loopback = {
1323         .sin_family             = AF_INET,
1324         .sin_addr.s_addr        = htonl(INADDR_ANY),
1325 };
1326
1327 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1328         .sin6_family            = AF_INET6,
1329         .sin6_addr              = IN6ADDR_ANY_INIT,
1330 };
1331
1332 /*
1333  * Try a getsockname() on a connected datagram socket.  Using a
1334  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1335  * This conserves the ephemeral port number space.
1336  *
1337  * Returns zero and fills in "buf" if successful; otherwise, a
1338  * negative errno is returned.
1339  */
1340 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1341                         struct sockaddr *buf)
1342 {
1343         struct socket *sock;
1344         int err;
1345
1346         err = __sock_create(net, sap->sa_family,
1347                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1348         if (err < 0) {
1349                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1350                 goto out;
1351         }
1352
1353         switch (sap->sa_family) {
1354         case AF_INET:
1355                 err = kernel_bind(sock,
1356                                 (struct sockaddr *)&rpc_inaddr_loopback,
1357                                 sizeof(rpc_inaddr_loopback));
1358                 break;
1359         case AF_INET6:
1360                 err = kernel_bind(sock,
1361                                 (struct sockaddr *)&rpc_in6addr_loopback,
1362                                 sizeof(rpc_in6addr_loopback));
1363                 break;
1364         default:
1365                 err = -EAFNOSUPPORT;
1366                 goto out;
1367         }
1368         if (err < 0) {
1369                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1370                 goto out_release;
1371         }
1372
1373         err = kernel_connect(sock, sap, salen, 0);
1374         if (err < 0) {
1375                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1376                 goto out_release;
1377         }
1378
1379         err = kernel_getsockname(sock, buf);
1380         if (err < 0) {
1381                 dprintk("RPC:       getsockname failed (%d)\n", err);
1382                 goto out_release;
1383         }
1384
1385         err = 0;
1386         if (buf->sa_family == AF_INET6) {
1387                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1388                 sin6->sin6_scope_id = 0;
1389         }
1390         dprintk("RPC:       %s succeeded\n", __func__);
1391
1392 out_release:
1393         sock_release(sock);
1394 out:
1395         return err;
1396 }
1397
1398 /*
1399  * Scraping a connected socket failed, so we don't have a useable
1400  * local address.  Fallback: generate an address that will prevent
1401  * the server from calling us back.
1402  *
1403  * Returns zero and fills in "buf" if successful; otherwise, a
1404  * negative errno is returned.
1405  */
1406 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1407 {
1408         switch (family) {
1409         case AF_INET:
1410                 if (buflen < sizeof(rpc_inaddr_loopback))
1411                         return -EINVAL;
1412                 memcpy(buf, &rpc_inaddr_loopback,
1413                                 sizeof(rpc_inaddr_loopback));
1414                 break;
1415         case AF_INET6:
1416                 if (buflen < sizeof(rpc_in6addr_loopback))
1417                         return -EINVAL;
1418                 memcpy(buf, &rpc_in6addr_loopback,
1419                                 sizeof(rpc_in6addr_loopback));
1420                 break;
1421         default:
1422                 dprintk("RPC:       %s: address family not supported\n",
1423                         __func__);
1424                 return -EAFNOSUPPORT;
1425         }
1426         dprintk("RPC:       %s: succeeded\n", __func__);
1427         return 0;
1428 }
1429
1430 /**
1431  * rpc_localaddr - discover local endpoint address for an RPC client
1432  * @clnt: RPC client structure
1433  * @buf: target buffer
1434  * @buflen: size of target buffer, in bytes
1435  *
1436  * Returns zero and fills in "buf" and "buflen" if successful;
1437  * otherwise, a negative errno is returned.
1438  *
1439  * This works even if the underlying transport is not currently connected,
1440  * or if the upper layer never previously provided a source address.
1441  *
1442  * The result of this function call is transient: multiple calls in
1443  * succession may give different results, depending on how local
1444  * networking configuration changes over time.
1445  */
1446 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1447 {
1448         struct sockaddr_storage address;
1449         struct sockaddr *sap = (struct sockaddr *)&address;
1450         struct rpc_xprt *xprt;
1451         struct net *net;
1452         size_t salen;
1453         int err;
1454
1455         rcu_read_lock();
1456         xprt = rcu_dereference(clnt->cl_xprt);
1457         salen = xprt->addrlen;
1458         memcpy(sap, &xprt->addr, salen);
1459         net = get_net(xprt->xprt_net);
1460         rcu_read_unlock();
1461
1462         rpc_set_port(sap, 0);
1463         err = rpc_sockname(net, sap, salen, buf);
1464         put_net(net);
1465         if (err != 0)
1466                 /* Couldn't discover local address, return ANYADDR */
1467                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1468         return 0;
1469 }
1470 EXPORT_SYMBOL_GPL(rpc_localaddr);
1471
1472 void
1473 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1474 {
1475         struct rpc_xprt *xprt;
1476
1477         rcu_read_lock();
1478         xprt = rcu_dereference(clnt->cl_xprt);
1479         if (xprt->ops->set_buffer_size)
1480                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1481         rcu_read_unlock();
1482 }
1483 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1484
1485 /**
1486  * rpc_net_ns - Get the network namespace for this RPC client
1487  * @clnt: RPC client to query
1488  *
1489  */
1490 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1491 {
1492         struct net *ret;
1493
1494         rcu_read_lock();
1495         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1496         rcu_read_unlock();
1497         return ret;
1498 }
1499 EXPORT_SYMBOL_GPL(rpc_net_ns);
1500
1501 /**
1502  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1503  * @clnt: RPC client to query
1504  *
1505  * For stream transports, this is one RPC record fragment (see RFC
1506  * 1831), as we don't support multi-record requests yet.  For datagram
1507  * transports, this is the size of an IP packet minus the IP, UDP, and
1508  * RPC header sizes.
1509  */
1510 size_t rpc_max_payload(struct rpc_clnt *clnt)
1511 {
1512         size_t ret;
1513
1514         rcu_read_lock();
1515         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1516         rcu_read_unlock();
1517         return ret;
1518 }
1519 EXPORT_SYMBOL_GPL(rpc_max_payload);
1520
1521 /**
1522  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1523  * @clnt: RPC client to query
1524  */
1525 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1526 {
1527         struct rpc_xprt *xprt;
1528         size_t ret;
1529
1530         rcu_read_lock();
1531         xprt = rcu_dereference(clnt->cl_xprt);
1532         ret = xprt->ops->bc_maxpayload(xprt);
1533         rcu_read_unlock();
1534         return ret;
1535 }
1536 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1537
1538 unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1539 {
1540         struct rpc_xprt *xprt;
1541         unsigned int ret;
1542
1543         rcu_read_lock();
1544         xprt = rcu_dereference(clnt->cl_xprt);
1545         ret = xprt->ops->bc_num_slots(xprt);
1546         rcu_read_unlock();
1547         return ret;
1548 }
1549 EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1550
1551 /**
1552  * rpc_force_rebind - force transport to check that remote port is unchanged
1553  * @clnt: client to rebind
1554  *
1555  */
1556 void rpc_force_rebind(struct rpc_clnt *clnt)
1557 {
1558         if (clnt->cl_autobind) {
1559                 rcu_read_lock();
1560                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1561                 rcu_read_unlock();
1562         }
1563 }
1564 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1565
1566 static int
1567 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1568 {
1569         task->tk_status = 0;
1570         task->tk_rpc_status = 0;
1571         task->tk_action = action;
1572         return 1;
1573 }
1574
1575 /*
1576  * Restart an (async) RPC call. Usually called from within the
1577  * exit handler.
1578  */
1579 int
1580 rpc_restart_call(struct rpc_task *task)
1581 {
1582         return __rpc_restart_call(task, call_start);
1583 }
1584 EXPORT_SYMBOL_GPL(rpc_restart_call);
1585
1586 /*
1587  * Restart an (async) RPC call from the call_prepare state.
1588  * Usually called from within the exit handler.
1589  */
1590 int
1591 rpc_restart_call_prepare(struct rpc_task *task)
1592 {
1593         if (task->tk_ops->rpc_call_prepare != NULL)
1594                 return __rpc_restart_call(task, rpc_prepare_task);
1595         return rpc_restart_call(task);
1596 }
1597 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1598
1599 const char
1600 *rpc_proc_name(const struct rpc_task *task)
1601 {
1602         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1603
1604         if (proc) {
1605                 if (proc->p_name)
1606                         return proc->p_name;
1607                 else
1608                         return "NULL";
1609         } else
1610                 return "no proc";
1611 }
1612
1613 static void
1614 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1615 {
1616         trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1617         task->tk_rpc_status = rpc_status;
1618         rpc_exit(task, tk_status);
1619 }
1620
1621 static void
1622 rpc_call_rpcerror(struct rpc_task *task, int status)
1623 {
1624         __rpc_call_rpcerror(task, status, status);
1625 }
1626
1627 /*
1628  * 0.  Initial state
1629  *
1630  *     Other FSM states can be visited zero or more times, but
1631  *     this state is visited exactly once for each RPC.
1632  */
1633 static void
1634 call_start(struct rpc_task *task)
1635 {
1636         struct rpc_clnt *clnt = task->tk_client;
1637         int idx = task->tk_msg.rpc_proc->p_statidx;
1638
1639         trace_rpc_request(task);
1640
1641         /* Increment call count (version might not be valid for ping) */
1642         if (clnt->cl_program->version[clnt->cl_vers])
1643                 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1644         clnt->cl_stats->rpccnt++;
1645         task->tk_action = call_reserve;
1646         rpc_task_set_transport(task, clnt);
1647 }
1648
1649 /*
1650  * 1.   Reserve an RPC call slot
1651  */
1652 static void
1653 call_reserve(struct rpc_task *task)
1654 {
1655         task->tk_status  = 0;
1656         task->tk_action  = call_reserveresult;
1657         xprt_reserve(task);
1658 }
1659
1660 static void call_retry_reserve(struct rpc_task *task);
1661
1662 /*
1663  * 1b.  Grok the result of xprt_reserve()
1664  */
1665 static void
1666 call_reserveresult(struct rpc_task *task)
1667 {
1668         int status = task->tk_status;
1669
1670         /*
1671          * After a call to xprt_reserve(), we must have either
1672          * a request slot or else an error status.
1673          */
1674         task->tk_status = 0;
1675         if (status >= 0) {
1676                 if (task->tk_rqstp) {
1677                         task->tk_action = call_refresh;
1678                         return;
1679                 }
1680
1681                 rpc_call_rpcerror(task, -EIO);
1682                 return;
1683         }
1684
1685         switch (status) {
1686         case -ENOMEM:
1687                 rpc_delay(task, HZ >> 2);
1688                 fallthrough;
1689         case -EAGAIN:   /* woken up; retry */
1690                 task->tk_action = call_retry_reserve;
1691                 return;
1692         default:
1693                 rpc_call_rpcerror(task, status);
1694         }
1695 }
1696
1697 /*
1698  * 1c.  Retry reserving an RPC call slot
1699  */
1700 static void
1701 call_retry_reserve(struct rpc_task *task)
1702 {
1703         task->tk_status  = 0;
1704         task->tk_action  = call_reserveresult;
1705         xprt_retry_reserve(task);
1706 }
1707
1708 /*
1709  * 2.   Bind and/or refresh the credentials
1710  */
1711 static void
1712 call_refresh(struct rpc_task *task)
1713 {
1714         task->tk_action = call_refreshresult;
1715         task->tk_status = 0;
1716         task->tk_client->cl_stats->rpcauthrefresh++;
1717         rpcauth_refreshcred(task);
1718 }
1719
1720 /*
1721  * 2a.  Process the results of a credential refresh
1722  */
1723 static void
1724 call_refreshresult(struct rpc_task *task)
1725 {
1726         int status = task->tk_status;
1727
1728         task->tk_status = 0;
1729         task->tk_action = call_refresh;
1730         switch (status) {
1731         case 0:
1732                 if (rpcauth_uptodatecred(task)) {
1733                         task->tk_action = call_allocate;
1734                         return;
1735                 }
1736                 /* Use rate-limiting and a max number of retries if refresh
1737                  * had status 0 but failed to update the cred.
1738                  */
1739                 fallthrough;
1740         case -ETIMEDOUT:
1741                 rpc_delay(task, 3*HZ);
1742                 fallthrough;
1743         case -EAGAIN:
1744                 status = -EACCES;
1745                 fallthrough;
1746         case -EKEYEXPIRED:
1747                 if (!task->tk_cred_retry)
1748                         break;
1749                 task->tk_cred_retry--;
1750                 trace_rpc_retry_refresh_status(task);
1751                 return;
1752         }
1753         trace_rpc_refresh_status(task);
1754         rpc_call_rpcerror(task, status);
1755 }
1756
1757 /*
1758  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1759  *      (Note: buffer memory is freed in xprt_release).
1760  */
1761 static void
1762 call_allocate(struct rpc_task *task)
1763 {
1764         const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1765         struct rpc_rqst *req = task->tk_rqstp;
1766         struct rpc_xprt *xprt = req->rq_xprt;
1767         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1768         int status;
1769
1770         task->tk_status = 0;
1771         task->tk_action = call_encode;
1772
1773         if (req->rq_buffer)
1774                 return;
1775
1776         if (proc->p_proc != 0) {
1777                 BUG_ON(proc->p_arglen == 0);
1778                 if (proc->p_decode != NULL)
1779                         BUG_ON(proc->p_replen == 0);
1780         }
1781
1782         /*
1783          * Calculate the size (in quads) of the RPC call
1784          * and reply headers, and convert both values
1785          * to byte sizes.
1786          */
1787         req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1788                            proc->p_arglen;
1789         req->rq_callsize <<= 2;
1790         /*
1791          * Note: the reply buffer must at minimum allocate enough space
1792          * for the 'struct accepted_reply' from RFC5531.
1793          */
1794         req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1795                         max_t(size_t, proc->p_replen, 2);
1796         req->rq_rcvsize <<= 2;
1797
1798         status = xprt->ops->buf_alloc(task);
1799         trace_rpc_buf_alloc(task, status);
1800         if (status == 0)
1801                 return;
1802         if (status != -ENOMEM) {
1803                 rpc_call_rpcerror(task, status);
1804                 return;
1805         }
1806
1807         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1808                 task->tk_action = call_allocate;
1809                 rpc_delay(task, HZ>>4);
1810                 return;
1811         }
1812
1813         rpc_call_rpcerror(task, -ERESTARTSYS);
1814 }
1815
1816 static int
1817 rpc_task_need_encode(struct rpc_task *task)
1818 {
1819         return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1820                 (!(task->tk_flags & RPC_TASK_SENT) ||
1821                  !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1822                  xprt_request_need_retransmit(task));
1823 }
1824
1825 static void
1826 rpc_xdr_encode(struct rpc_task *task)
1827 {
1828         struct rpc_rqst *req = task->tk_rqstp;
1829         struct xdr_stream xdr;
1830
1831         xdr_buf_init(&req->rq_snd_buf,
1832                      req->rq_buffer,
1833                      req->rq_callsize);
1834         xdr_buf_init(&req->rq_rcv_buf,
1835                      req->rq_rbuffer,
1836                      req->rq_rcvsize);
1837
1838         req->rq_reply_bytes_recvd = 0;
1839         req->rq_snd_buf.head[0].iov_len = 0;
1840         xdr_init_encode(&xdr, &req->rq_snd_buf,
1841                         req->rq_snd_buf.head[0].iov_base, req);
1842         xdr_free_bvec(&req->rq_snd_buf);
1843         if (rpc_encode_header(task, &xdr))
1844                 return;
1845
1846         task->tk_status = rpcauth_wrap_req(task, &xdr);
1847 }
1848
1849 /*
1850  * 3.   Encode arguments of an RPC call
1851  */
1852 static void
1853 call_encode(struct rpc_task *task)
1854 {
1855         if (!rpc_task_need_encode(task))
1856                 goto out;
1857
1858         /* Dequeue task from the receive queue while we're encoding */
1859         xprt_request_dequeue_xprt(task);
1860         /* Encode here so that rpcsec_gss can use correct sequence number. */
1861         rpc_xdr_encode(task);
1862         /* Did the encode result in an error condition? */
1863         if (task->tk_status != 0) {
1864                 /* Was the error nonfatal? */
1865                 switch (task->tk_status) {
1866                 case -EAGAIN:
1867                 case -ENOMEM:
1868                         rpc_delay(task, HZ >> 4);
1869                         break;
1870                 case -EKEYEXPIRED:
1871                         if (!task->tk_cred_retry) {
1872                                 rpc_exit(task, task->tk_status);
1873                         } else {
1874                                 task->tk_action = call_refresh;
1875                                 task->tk_cred_retry--;
1876                                 trace_rpc_retry_refresh_status(task);
1877                         }
1878                         break;
1879                 default:
1880                         rpc_call_rpcerror(task, task->tk_status);
1881                 }
1882                 return;
1883         }
1884
1885         /* Add task to reply queue before transmission to avoid races */
1886         if (rpc_reply_expected(task))
1887                 xprt_request_enqueue_receive(task);
1888         xprt_request_enqueue_transmit(task);
1889 out:
1890         task->tk_action = call_transmit;
1891         /* Check that the connection is OK */
1892         if (!xprt_bound(task->tk_xprt))
1893                 task->tk_action = call_bind;
1894         else if (!xprt_connected(task->tk_xprt))
1895                 task->tk_action = call_connect;
1896 }
1897
1898 /*
1899  * Helpers to check if the task was already transmitted, and
1900  * to take action when that is the case.
1901  */
1902 static bool
1903 rpc_task_transmitted(struct rpc_task *task)
1904 {
1905         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1906 }
1907
1908 static void
1909 rpc_task_handle_transmitted(struct rpc_task *task)
1910 {
1911         xprt_end_transmit(task);
1912         task->tk_action = call_transmit_status;
1913 }
1914
1915 /*
1916  * 4.   Get the server port number if not yet set
1917  */
1918 static void
1919 call_bind(struct rpc_task *task)
1920 {
1921         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1922
1923         if (rpc_task_transmitted(task)) {
1924                 rpc_task_handle_transmitted(task);
1925                 return;
1926         }
1927
1928         if (xprt_bound(xprt)) {
1929                 task->tk_action = call_connect;
1930                 return;
1931         }
1932
1933         task->tk_action = call_bind_status;
1934         if (!xprt_prepare_transmit(task))
1935                 return;
1936
1937         xprt->ops->rpcbind(task);
1938 }
1939
1940 /*
1941  * 4a.  Sort out bind result
1942  */
1943 static void
1944 call_bind_status(struct rpc_task *task)
1945 {
1946         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1947         int status = -EIO;
1948
1949         if (rpc_task_transmitted(task)) {
1950                 rpc_task_handle_transmitted(task);
1951                 return;
1952         }
1953
1954         if (task->tk_status >= 0)
1955                 goto out_next;
1956         if (xprt_bound(xprt)) {
1957                 task->tk_status = 0;
1958                 goto out_next;
1959         }
1960
1961         switch (task->tk_status) {
1962         case -ENOMEM:
1963                 rpc_delay(task, HZ >> 2);
1964                 goto retry_timeout;
1965         case -EACCES:
1966                 trace_rpcb_prog_unavail_err(task);
1967                 /* fail immediately if this is an RPC ping */
1968                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1969                         status = -EOPNOTSUPP;
1970                         break;
1971                 }
1972                 if (task->tk_rebind_retry == 0)
1973                         break;
1974                 task->tk_rebind_retry--;
1975                 rpc_delay(task, 3*HZ);
1976                 goto retry_timeout;
1977         case -ENOBUFS:
1978                 rpc_delay(task, HZ >> 2);
1979                 goto retry_timeout;
1980         case -EAGAIN:
1981                 goto retry_timeout;
1982         case -ETIMEDOUT:
1983                 trace_rpcb_timeout_err(task);
1984                 goto retry_timeout;
1985         case -EPFNOSUPPORT:
1986                 /* server doesn't support any rpcbind version we know of */
1987                 trace_rpcb_bind_version_err(task);
1988                 break;
1989         case -EPROTONOSUPPORT:
1990                 trace_rpcb_bind_version_err(task);
1991                 goto retry_timeout;
1992         case -ECONNREFUSED:             /* connection problems */
1993         case -ECONNRESET:
1994         case -ECONNABORTED:
1995         case -ENOTCONN:
1996         case -EHOSTDOWN:
1997         case -ENETDOWN:
1998         case -EHOSTUNREACH:
1999         case -ENETUNREACH:
2000         case -EPIPE:
2001                 trace_rpcb_unreachable_err(task);
2002                 if (!RPC_IS_SOFTCONN(task)) {
2003                         rpc_delay(task, 5*HZ);
2004                         goto retry_timeout;
2005                 }
2006                 status = task->tk_status;
2007                 break;
2008         default:
2009                 trace_rpcb_unrecognized_err(task);
2010         }
2011
2012         rpc_call_rpcerror(task, status);
2013         return;
2014 out_next:
2015         task->tk_action = call_connect;
2016         return;
2017 retry_timeout:
2018         task->tk_status = 0;
2019         task->tk_action = call_bind;
2020         rpc_check_timeout(task);
2021 }
2022
2023 /*
2024  * 4b.  Connect to the RPC server
2025  */
2026 static void
2027 call_connect(struct rpc_task *task)
2028 {
2029         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2030
2031         if (rpc_task_transmitted(task)) {
2032                 rpc_task_handle_transmitted(task);
2033                 return;
2034         }
2035
2036         if (xprt_connected(xprt)) {
2037                 task->tk_action = call_transmit;
2038                 return;
2039         }
2040
2041         task->tk_action = call_connect_status;
2042         if (task->tk_status < 0)
2043                 return;
2044         if (task->tk_flags & RPC_TASK_NOCONNECT) {
2045                 rpc_call_rpcerror(task, -ENOTCONN);
2046                 return;
2047         }
2048         if (!xprt_prepare_transmit(task))
2049                 return;
2050         xprt_connect(task);
2051 }
2052
2053 /*
2054  * 4c.  Sort out connect result
2055  */
2056 static void
2057 call_connect_status(struct rpc_task *task)
2058 {
2059         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2060         struct rpc_clnt *clnt = task->tk_client;
2061         int status = task->tk_status;
2062
2063         if (rpc_task_transmitted(task)) {
2064                 rpc_task_handle_transmitted(task);
2065                 return;
2066         }
2067
2068         trace_rpc_connect_status(task);
2069
2070         if (task->tk_status == 0) {
2071                 clnt->cl_stats->netreconn++;
2072                 goto out_next;
2073         }
2074         if (xprt_connected(xprt)) {
2075                 task->tk_status = 0;
2076                 goto out_next;
2077         }
2078
2079         task->tk_status = 0;
2080         switch (status) {
2081         case -ECONNREFUSED:
2082                 /* A positive refusal suggests a rebind is needed. */
2083                 if (RPC_IS_SOFTCONN(task))
2084                         break;
2085                 if (clnt->cl_autobind) {
2086                         rpc_force_rebind(clnt);
2087                         goto out_retry;
2088                 }
2089                 fallthrough;
2090         case -ECONNRESET:
2091         case -ECONNABORTED:
2092         case -ENETDOWN:
2093         case -ENETUNREACH:
2094         case -EHOSTUNREACH:
2095         case -EPIPE:
2096         case -EPROTO:
2097                 xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2098                                             task->tk_rqstp->rq_connect_cookie);
2099                 if (RPC_IS_SOFTCONN(task))
2100                         break;
2101                 /* retry with existing socket, after a delay */
2102                 rpc_delay(task, 3*HZ);
2103                 fallthrough;
2104         case -EADDRINUSE:
2105         case -ENOTCONN:
2106         case -EAGAIN:
2107         case -ETIMEDOUT:
2108                 goto out_retry;
2109         case -ENOBUFS:
2110                 rpc_delay(task, HZ >> 2);
2111                 goto out_retry;
2112         }
2113         rpc_call_rpcerror(task, status);
2114         return;
2115 out_next:
2116         task->tk_action = call_transmit;
2117         return;
2118 out_retry:
2119         /* Check for timeouts before looping back to call_bind */
2120         task->tk_action = call_bind;
2121         rpc_check_timeout(task);
2122 }
2123
2124 /*
2125  * 5.   Transmit the RPC request, and wait for reply
2126  */
2127 static void
2128 call_transmit(struct rpc_task *task)
2129 {
2130         if (rpc_task_transmitted(task)) {
2131                 rpc_task_handle_transmitted(task);
2132                 return;
2133         }
2134
2135         task->tk_action = call_transmit_status;
2136         if (!xprt_prepare_transmit(task))
2137                 return;
2138         task->tk_status = 0;
2139         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2140                 if (!xprt_connected(task->tk_xprt)) {
2141                         task->tk_status = -ENOTCONN;
2142                         return;
2143                 }
2144                 xprt_transmit(task);
2145         }
2146         xprt_end_transmit(task);
2147 }
2148
2149 /*
2150  * 5a.  Handle cleanup after a transmission
2151  */
2152 static void
2153 call_transmit_status(struct rpc_task *task)
2154 {
2155         task->tk_action = call_status;
2156
2157         /*
2158          * Common case: success.  Force the compiler to put this
2159          * test first.
2160          */
2161         if (rpc_task_transmitted(task)) {
2162                 task->tk_status = 0;
2163                 xprt_request_wait_receive(task);
2164                 return;
2165         }
2166
2167         switch (task->tk_status) {
2168         default:
2169                 break;
2170         case -EBADMSG:
2171                 task->tk_status = 0;
2172                 task->tk_action = call_encode;
2173                 break;
2174                 /*
2175                  * Special cases: if we've been waiting on the
2176                  * socket's write_space() callback, or if the
2177                  * socket just returned a connection error,
2178                  * then hold onto the transport lock.
2179                  */
2180         case -ENOBUFS:
2181                 rpc_delay(task, HZ>>2);
2182                 fallthrough;
2183         case -EBADSLT:
2184         case -EAGAIN:
2185                 task->tk_action = call_transmit;
2186                 task->tk_status = 0;
2187                 break;
2188         case -ECONNREFUSED:
2189         case -EHOSTDOWN:
2190         case -ENETDOWN:
2191         case -EHOSTUNREACH:
2192         case -ENETUNREACH:
2193         case -EPERM:
2194                 if (RPC_IS_SOFTCONN(task)) {
2195                         if (!task->tk_msg.rpc_proc->p_proc)
2196                                 trace_xprt_ping(task->tk_xprt,
2197                                                 task->tk_status);
2198                         rpc_call_rpcerror(task, task->tk_status);
2199                         return;
2200                 }
2201                 fallthrough;
2202         case -ECONNRESET:
2203         case -ECONNABORTED:
2204         case -EADDRINUSE:
2205         case -ENOTCONN:
2206         case -EPIPE:
2207                 task->tk_action = call_bind;
2208                 task->tk_status = 0;
2209                 break;
2210         }
2211         rpc_check_timeout(task);
2212 }
2213
2214 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2215 static void call_bc_transmit(struct rpc_task *task);
2216 static void call_bc_transmit_status(struct rpc_task *task);
2217
2218 static void
2219 call_bc_encode(struct rpc_task *task)
2220 {
2221         xprt_request_enqueue_transmit(task);
2222         task->tk_action = call_bc_transmit;
2223 }
2224
2225 /*
2226  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2227  * addition, disconnect on connectivity errors.
2228  */
2229 static void
2230 call_bc_transmit(struct rpc_task *task)
2231 {
2232         task->tk_action = call_bc_transmit_status;
2233         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2234                 if (!xprt_prepare_transmit(task))
2235                         return;
2236                 task->tk_status = 0;
2237                 xprt_transmit(task);
2238         }
2239         xprt_end_transmit(task);
2240 }
2241
2242 static void
2243 call_bc_transmit_status(struct rpc_task *task)
2244 {
2245         struct rpc_rqst *req = task->tk_rqstp;
2246
2247         if (rpc_task_transmitted(task))
2248                 task->tk_status = 0;
2249
2250         switch (task->tk_status) {
2251         case 0:
2252                 /* Success */
2253         case -ENETDOWN:
2254         case -EHOSTDOWN:
2255         case -EHOSTUNREACH:
2256         case -ENETUNREACH:
2257         case -ECONNRESET:
2258         case -ECONNREFUSED:
2259         case -EADDRINUSE:
2260         case -ENOTCONN:
2261         case -EPIPE:
2262                 break;
2263         case -ENOBUFS:
2264                 rpc_delay(task, HZ>>2);
2265                 fallthrough;
2266         case -EBADSLT:
2267         case -EAGAIN:
2268                 task->tk_status = 0;
2269                 task->tk_action = call_bc_transmit;
2270                 return;
2271         case -ETIMEDOUT:
2272                 /*
2273                  * Problem reaching the server.  Disconnect and let the
2274                  * forechannel reestablish the connection.  The server will
2275                  * have to retransmit the backchannel request and we'll
2276                  * reprocess it.  Since these ops are idempotent, there's no
2277                  * need to cache our reply at this time.
2278                  */
2279                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2280                         "error: %d\n", task->tk_status);
2281                 xprt_conditional_disconnect(req->rq_xprt,
2282                         req->rq_connect_cookie);
2283                 break;
2284         default:
2285                 /*
2286                  * We were unable to reply and will have to drop the
2287                  * request.  The server should reconnect and retransmit.
2288                  */
2289                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2290                         "error: %d\n", task->tk_status);
2291                 break;
2292         }
2293         task->tk_action = rpc_exit_task;
2294 }
2295 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2296
2297 /*
2298  * 6.   Sort out the RPC call status
2299  */
2300 static void
2301 call_status(struct rpc_task *task)
2302 {
2303         struct rpc_clnt *clnt = task->tk_client;
2304         int             status;
2305
2306         if (!task->tk_msg.rpc_proc->p_proc)
2307                 trace_xprt_ping(task->tk_xprt, task->tk_status);
2308
2309         status = task->tk_status;
2310         if (status >= 0) {
2311                 task->tk_action = call_decode;
2312                 return;
2313         }
2314
2315         trace_rpc_call_status(task);
2316         task->tk_status = 0;
2317         switch(status) {
2318         case -EHOSTDOWN:
2319         case -ENETDOWN:
2320         case -EHOSTUNREACH:
2321         case -ENETUNREACH:
2322         case -EPERM:
2323                 if (RPC_IS_SOFTCONN(task))
2324                         goto out_exit;
2325                 /*
2326                  * Delay any retries for 3 seconds, then handle as if it
2327                  * were a timeout.
2328                  */
2329                 rpc_delay(task, 3*HZ);
2330                 fallthrough;
2331         case -ETIMEDOUT:
2332                 break;
2333         case -ECONNREFUSED:
2334         case -ECONNRESET:
2335         case -ECONNABORTED:
2336         case -ENOTCONN:
2337                 rpc_force_rebind(clnt);
2338                 break;
2339         case -EADDRINUSE:
2340                 rpc_delay(task, 3*HZ);
2341                 fallthrough;
2342         case -EPIPE:
2343         case -EAGAIN:
2344                 break;
2345         case -EIO:
2346                 /* shutdown or soft timeout */
2347                 goto out_exit;
2348         default:
2349                 if (clnt->cl_chatty)
2350                         printk("%s: RPC call returned error %d\n",
2351                                clnt->cl_program->name, -status);
2352                 goto out_exit;
2353         }
2354         task->tk_action = call_encode;
2355         if (status != -ECONNRESET && status != -ECONNABORTED)
2356                 rpc_check_timeout(task);
2357         return;
2358 out_exit:
2359         rpc_call_rpcerror(task, status);
2360 }
2361
2362 static bool
2363 rpc_check_connected(const struct rpc_rqst *req)
2364 {
2365         /* No allocated request or transport? return true */
2366         if (!req || !req->rq_xprt)
2367                 return true;
2368         return xprt_connected(req->rq_xprt);
2369 }
2370
2371 static void
2372 rpc_check_timeout(struct rpc_task *task)
2373 {
2374         struct rpc_clnt *clnt = task->tk_client;
2375
2376         if (RPC_SIGNALLED(task)) {
2377                 rpc_call_rpcerror(task, -ERESTARTSYS);
2378                 return;
2379         }
2380
2381         if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2382                 return;
2383
2384         trace_rpc_timeout_status(task);
2385         task->tk_timeouts++;
2386
2387         if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2388                 rpc_call_rpcerror(task, -ETIMEDOUT);
2389                 return;
2390         }
2391
2392         if (RPC_IS_SOFT(task)) {
2393                 /*
2394                  * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2395                  * been sent, it should time out only if the transport
2396                  * connection gets terminally broken.
2397                  */
2398                 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2399                     rpc_check_connected(task->tk_rqstp))
2400                         return;
2401
2402                 if (clnt->cl_chatty) {
2403                         pr_notice_ratelimited(
2404                                 "%s: server %s not responding, timed out\n",
2405                                 clnt->cl_program->name,
2406                                 task->tk_xprt->servername);
2407                 }
2408                 if (task->tk_flags & RPC_TASK_TIMEOUT)
2409                         rpc_call_rpcerror(task, -ETIMEDOUT);
2410                 else
2411                         __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2412                 return;
2413         }
2414
2415         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2416                 task->tk_flags |= RPC_CALL_MAJORSEEN;
2417                 if (clnt->cl_chatty) {
2418                         pr_notice_ratelimited(
2419                                 "%s: server %s not responding, still trying\n",
2420                                 clnt->cl_program->name,
2421                                 task->tk_xprt->servername);
2422                 }
2423         }
2424         rpc_force_rebind(clnt);
2425         /*
2426          * Did our request time out due to an RPCSEC_GSS out-of-sequence
2427          * event? RFC2203 requires the server to drop all such requests.
2428          */
2429         rpcauth_invalcred(task);
2430 }
2431
2432 /*
2433  * 7.   Decode the RPC reply
2434  */
2435 static void
2436 call_decode(struct rpc_task *task)
2437 {
2438         struct rpc_clnt *clnt = task->tk_client;
2439         struct rpc_rqst *req = task->tk_rqstp;
2440         struct xdr_stream xdr;
2441         int err;
2442
2443         if (!task->tk_msg.rpc_proc->p_decode) {
2444                 task->tk_action = rpc_exit_task;
2445                 return;
2446         }
2447
2448         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2449                 if (clnt->cl_chatty) {
2450                         pr_notice_ratelimited("%s: server %s OK\n",
2451                                 clnt->cl_program->name,
2452                                 task->tk_xprt->servername);
2453                 }
2454                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2455         }
2456
2457         /*
2458          * Did we ever call xprt_complete_rqst()? If not, we should assume
2459          * the message is incomplete.
2460          */
2461         err = -EAGAIN;
2462         if (!req->rq_reply_bytes_recvd)
2463                 goto out;
2464
2465         /* Ensure that we see all writes made by xprt_complete_rqst()
2466          * before it changed req->rq_reply_bytes_recvd.
2467          */
2468         smp_rmb();
2469
2470         req->rq_rcv_buf.len = req->rq_private_buf.len;
2471         trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2472
2473         /* Check that the softirq receive buffer is valid */
2474         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2475                                 sizeof(req->rq_rcv_buf)) != 0);
2476
2477         xdr_init_decode(&xdr, &req->rq_rcv_buf,
2478                         req->rq_rcv_buf.head[0].iov_base, req);
2479         err = rpc_decode_header(task, &xdr);
2480 out:
2481         switch (err) {
2482         case 0:
2483                 task->tk_action = rpc_exit_task;
2484                 task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2485                 return;
2486         case -EAGAIN:
2487                 task->tk_status = 0;
2488                 if (task->tk_client->cl_discrtry)
2489                         xprt_conditional_disconnect(req->rq_xprt,
2490                                                     req->rq_connect_cookie);
2491                 task->tk_action = call_encode;
2492                 rpc_check_timeout(task);
2493                 break;
2494         case -EKEYREJECTED:
2495                 task->tk_action = call_reserve;
2496                 rpc_check_timeout(task);
2497                 rpcauth_invalcred(task);
2498                 /* Ensure we obtain a new XID if we retry! */
2499                 xprt_release(task);
2500         }
2501 }
2502
2503 static int
2504 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2505 {
2506         struct rpc_clnt *clnt = task->tk_client;
2507         struct rpc_rqst *req = task->tk_rqstp;
2508         __be32 *p;
2509         int error;
2510
2511         error = -EMSGSIZE;
2512         p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2513         if (!p)
2514                 goto out_fail;
2515         *p++ = req->rq_xid;
2516         *p++ = rpc_call;
2517         *p++ = cpu_to_be32(RPC_VERSION);
2518         *p++ = cpu_to_be32(clnt->cl_prog);
2519         *p++ = cpu_to_be32(clnt->cl_vers);
2520         *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2521
2522         error = rpcauth_marshcred(task, xdr);
2523         if (error < 0)
2524                 goto out_fail;
2525         return 0;
2526 out_fail:
2527         trace_rpc_bad_callhdr(task);
2528         rpc_call_rpcerror(task, error);
2529         return error;
2530 }
2531
2532 static noinline int
2533 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2534 {
2535         struct rpc_clnt *clnt = task->tk_client;
2536         int error;
2537         __be32 *p;
2538
2539         /* RFC-1014 says that the representation of XDR data must be a
2540          * multiple of four bytes
2541          * - if it isn't pointer subtraction in the NFS client may give
2542          *   undefined results
2543          */
2544         if (task->tk_rqstp->rq_rcv_buf.len & 3)
2545                 goto out_unparsable;
2546
2547         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2548         if (!p)
2549                 goto out_unparsable;
2550         p++;    /* skip XID */
2551         if (*p++ != rpc_reply)
2552                 goto out_unparsable;
2553         if (*p++ != rpc_msg_accepted)
2554                 goto out_msg_denied;
2555
2556         error = rpcauth_checkverf(task, xdr);
2557         if (error)
2558                 goto out_verifier;
2559
2560         p = xdr_inline_decode(xdr, sizeof(*p));
2561         if (!p)
2562                 goto out_unparsable;
2563         switch (*p) {
2564         case rpc_success:
2565                 return 0;
2566         case rpc_prog_unavail:
2567                 trace_rpc__prog_unavail(task);
2568                 error = -EPFNOSUPPORT;
2569                 goto out_err;
2570         case rpc_prog_mismatch:
2571                 trace_rpc__prog_mismatch(task);
2572                 error = -EPROTONOSUPPORT;
2573                 goto out_err;
2574         case rpc_proc_unavail:
2575                 trace_rpc__proc_unavail(task);
2576                 error = -EOPNOTSUPP;
2577                 goto out_err;
2578         case rpc_garbage_args:
2579         case rpc_system_err:
2580                 trace_rpc__garbage_args(task);
2581                 error = -EIO;
2582                 break;
2583         default:
2584                 goto out_unparsable;
2585         }
2586
2587 out_garbage:
2588         clnt->cl_stats->rpcgarbage++;
2589         if (task->tk_garb_retry) {
2590                 task->tk_garb_retry--;
2591                 task->tk_action = call_encode;
2592                 return -EAGAIN;
2593         }
2594 out_err:
2595         rpc_call_rpcerror(task, error);
2596         return error;
2597
2598 out_unparsable:
2599         trace_rpc__unparsable(task);
2600         error = -EIO;
2601         goto out_garbage;
2602
2603 out_verifier:
2604         trace_rpc_bad_verifier(task);
2605         goto out_garbage;
2606
2607 out_msg_denied:
2608         error = -EACCES;
2609         p = xdr_inline_decode(xdr, sizeof(*p));
2610         if (!p)
2611                 goto out_unparsable;
2612         switch (*p++) {
2613         case rpc_auth_error:
2614                 break;
2615         case rpc_mismatch:
2616                 trace_rpc__mismatch(task);
2617                 error = -EPROTONOSUPPORT;
2618                 goto out_err;
2619         default:
2620                 goto out_unparsable;
2621         }
2622
2623         p = xdr_inline_decode(xdr, sizeof(*p));
2624         if (!p)
2625                 goto out_unparsable;
2626         switch (*p++) {
2627         case rpc_autherr_rejectedcred:
2628         case rpc_autherr_rejectedverf:
2629         case rpcsec_gsserr_credproblem:
2630         case rpcsec_gsserr_ctxproblem:
2631                 if (!task->tk_cred_retry)
2632                         break;
2633                 task->tk_cred_retry--;
2634                 trace_rpc__stale_creds(task);
2635                 return -EKEYREJECTED;
2636         case rpc_autherr_badcred:
2637         case rpc_autherr_badverf:
2638                 /* possibly garbled cred/verf? */
2639                 if (!task->tk_garb_retry)
2640                         break;
2641                 task->tk_garb_retry--;
2642                 trace_rpc__bad_creds(task);
2643                 task->tk_action = call_encode;
2644                 return -EAGAIN;
2645         case rpc_autherr_tooweak:
2646                 trace_rpc__auth_tooweak(task);
2647                 pr_warn("RPC: server %s requires stronger authentication.\n",
2648                         task->tk_xprt->servername);
2649                 break;
2650         default:
2651                 goto out_unparsable;
2652         }
2653         goto out_err;
2654 }
2655
2656 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2657                 const void *obj)
2658 {
2659 }
2660
2661 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2662                 void *obj)
2663 {
2664         return 0;
2665 }
2666
2667 static const struct rpc_procinfo rpcproc_null = {
2668         .p_encode = rpcproc_encode_null,
2669         .p_decode = rpcproc_decode_null,
2670 };
2671
2672 static int rpc_ping(struct rpc_clnt *clnt)
2673 {
2674         struct rpc_message msg = {
2675                 .rpc_proc = &rpcproc_null,
2676         };
2677         int err;
2678         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2679                             RPC_TASK_NULLCREDS);
2680         return err;
2681 }
2682
2683 static
2684 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2685                 struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2686                 const struct rpc_call_ops *ops, void *data)
2687 {
2688         struct rpc_message msg = {
2689                 .rpc_proc = &rpcproc_null,
2690         };
2691         struct rpc_task_setup task_setup_data = {
2692                 .rpc_client = clnt,
2693                 .rpc_xprt = xprt,
2694                 .rpc_message = &msg,
2695                 .rpc_op_cred = cred,
2696                 .callback_ops = (ops != NULL) ? ops : &rpc_default_ops,
2697                 .callback_data = data,
2698                 .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2699                          RPC_TASK_NULLCREDS,
2700         };
2701
2702         return rpc_run_task(&task_setup_data);
2703 }
2704
2705 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2706 {
2707         return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2708 }
2709 EXPORT_SYMBOL_GPL(rpc_call_null);
2710
2711 struct rpc_cb_add_xprt_calldata {
2712         struct rpc_xprt_switch *xps;
2713         struct rpc_xprt *xprt;
2714 };
2715
2716 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2717 {
2718         struct rpc_cb_add_xprt_calldata *data = calldata;
2719
2720         if (task->tk_status == 0)
2721                 rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2722 }
2723
2724 static void rpc_cb_add_xprt_release(void *calldata)
2725 {
2726         struct rpc_cb_add_xprt_calldata *data = calldata;
2727
2728         xprt_put(data->xprt);
2729         xprt_switch_put(data->xps);
2730         kfree(data);
2731 }
2732
2733 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2734         .rpc_call_done = rpc_cb_add_xprt_done,
2735         .rpc_release = rpc_cb_add_xprt_release,
2736 };
2737
2738 /**
2739  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2740  * @clnt: pointer to struct rpc_clnt
2741  * @xps: pointer to struct rpc_xprt_switch,
2742  * @xprt: pointer struct rpc_xprt
2743  * @dummy: unused
2744  */
2745 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2746                 struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2747                 void *dummy)
2748 {
2749         struct rpc_cb_add_xprt_calldata *data;
2750         struct rpc_task *task;
2751
2752         data = kmalloc(sizeof(*data), GFP_NOFS);
2753         if (!data)
2754                 return -ENOMEM;
2755         data->xps = xprt_switch_get(xps);
2756         data->xprt = xprt_get(xprt);
2757         if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2758                 rpc_cb_add_xprt_release(data);
2759                 goto success;
2760         }
2761
2762         task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2763                         &rpc_cb_add_xprt_call_ops, data);
2764
2765         rpc_put_task(task);
2766 success:
2767         return 1;
2768 }
2769 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2770
2771 /**
2772  * rpc_clnt_setup_test_and_add_xprt()
2773  *
2774  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2775  *   1) caller of the test function must dereference the rpc_xprt_switch
2776  *   and the rpc_xprt.
2777  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2778  *   the rpc_call_done routine.
2779  *
2780  * Upon success (return of 1), the test function adds the new
2781  * transport to the rpc_clnt xprt switch
2782  *
2783  * @clnt: struct rpc_clnt to get the new transport
2784  * @xps:  the rpc_xprt_switch to hold the new transport
2785  * @xprt: the rpc_xprt to test
2786  * @data: a struct rpc_add_xprt_test pointer that holds the test function
2787  *        and test function call data
2788  */
2789 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2790                                      struct rpc_xprt_switch *xps,
2791                                      struct rpc_xprt *xprt,
2792                                      void *data)
2793 {
2794         struct rpc_task *task;
2795         struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2796         int status = -EADDRINUSE;
2797
2798         xprt = xprt_get(xprt);
2799         xprt_switch_get(xps);
2800
2801         if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2802                 goto out_err;
2803
2804         /* Test the connection */
2805         task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2806         if (IS_ERR(task)) {
2807                 status = PTR_ERR(task);
2808                 goto out_err;
2809         }
2810         status = task->tk_status;
2811         rpc_put_task(task);
2812
2813         if (status < 0)
2814                 goto out_err;
2815
2816         /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2817         xtest->add_xprt_test(clnt, xprt, xtest->data);
2818
2819         xprt_put(xprt);
2820         xprt_switch_put(xps);
2821
2822         /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2823         return 1;
2824 out_err:
2825         xprt_put(xprt);
2826         xprt_switch_put(xps);
2827         pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2828                 status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2829         return status;
2830 }
2831 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2832
2833 /**
2834  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2835  * @clnt: pointer to struct rpc_clnt
2836  * @xprtargs: pointer to struct xprt_create
2837  * @setup: callback to test and/or set up the connection
2838  * @data: pointer to setup function data
2839  *
2840  * Creates a new transport using the parameters set in args and
2841  * adds it to clnt.
2842  * If ping is set, then test that connectivity succeeds before
2843  * adding the new transport.
2844  *
2845  */
2846 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2847                 struct xprt_create *xprtargs,
2848                 int (*setup)(struct rpc_clnt *,
2849                         struct rpc_xprt_switch *,
2850                         struct rpc_xprt *,
2851                         void *),
2852                 void *data)
2853 {
2854         struct rpc_xprt_switch *xps;
2855         struct rpc_xprt *xprt;
2856         unsigned long connect_timeout;
2857         unsigned long reconnect_timeout;
2858         unsigned char resvport, reuseport;
2859         int ret = 0;
2860
2861         rcu_read_lock();
2862         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2863         xprt = xprt_iter_xprt(&clnt->cl_xpi);
2864         if (xps == NULL || xprt == NULL) {
2865                 rcu_read_unlock();
2866                 xprt_switch_put(xps);
2867                 return -EAGAIN;
2868         }
2869         resvport = xprt->resvport;
2870         reuseport = xprt->reuseport;
2871         connect_timeout = xprt->connect_timeout;
2872         reconnect_timeout = xprt->max_reconnect_timeout;
2873         rcu_read_unlock();
2874
2875         xprt = xprt_create_transport(xprtargs);
2876         if (IS_ERR(xprt)) {
2877                 ret = PTR_ERR(xprt);
2878                 goto out_put_switch;
2879         }
2880         xprt->resvport = resvport;
2881         xprt->reuseport = reuseport;
2882         if (xprt->ops->set_connect_timeout != NULL)
2883                 xprt->ops->set_connect_timeout(xprt,
2884                                 connect_timeout,
2885                                 reconnect_timeout);
2886
2887         rpc_xprt_switch_set_roundrobin(xps);
2888         if (setup) {
2889                 ret = setup(clnt, xps, xprt, data);
2890                 if (ret != 0)
2891                         goto out_put_xprt;
2892         }
2893         rpc_xprt_switch_add_xprt(xps, xprt);
2894 out_put_xprt:
2895         xprt_put(xprt);
2896 out_put_switch:
2897         xprt_switch_put(xps);
2898         return ret;
2899 }
2900 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2901
2902 struct connect_timeout_data {
2903         unsigned long connect_timeout;
2904         unsigned long reconnect_timeout;
2905 };
2906
2907 static int
2908 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2909                 struct rpc_xprt *xprt,
2910                 void *data)
2911 {
2912         struct connect_timeout_data *timeo = data;
2913
2914         if (xprt->ops->set_connect_timeout)
2915                 xprt->ops->set_connect_timeout(xprt,
2916                                 timeo->connect_timeout,
2917                                 timeo->reconnect_timeout);
2918         return 0;
2919 }
2920
2921 void
2922 rpc_set_connect_timeout(struct rpc_clnt *clnt,
2923                 unsigned long connect_timeout,
2924                 unsigned long reconnect_timeout)
2925 {
2926         struct connect_timeout_data timeout = {
2927                 .connect_timeout = connect_timeout,
2928                 .reconnect_timeout = reconnect_timeout,
2929         };
2930         rpc_clnt_iterate_for_each_xprt(clnt,
2931                         rpc_xprt_set_connect_timeout,
2932                         &timeout);
2933 }
2934 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2935
2936 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2937 {
2938         rcu_read_lock();
2939         xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2940         rcu_read_unlock();
2941 }
2942 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
2943
2944 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
2945 {
2946         rcu_read_lock();
2947         rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
2948                                  xprt);
2949         rcu_read_unlock();
2950 }
2951 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
2952
2953 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
2954                                    const struct sockaddr *sap)
2955 {
2956         struct rpc_xprt_switch *xps;
2957         bool ret;
2958
2959         rcu_read_lock();
2960         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
2961         ret = rpc_xprt_switch_has_addr(xps, sap);
2962         rcu_read_unlock();
2963         return ret;
2964 }
2965 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
2966
2967 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
2968 static void rpc_show_header(void)
2969 {
2970         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2971                 "-timeout ---ops--\n");
2972 }
2973
2974 static void rpc_show_task(const struct rpc_clnt *clnt,
2975                           const struct rpc_task *task)
2976 {
2977         const char *rpc_waitq = "none";
2978
2979         if (RPC_IS_QUEUED(task))
2980                 rpc_waitq = rpc_qname(task->tk_waitqueue);
2981
2982         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2983                 task->tk_pid, task->tk_flags, task->tk_status,
2984                 clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
2985                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2986                 task->tk_action, rpc_waitq);
2987 }
2988
2989 void rpc_show_tasks(struct net *net)
2990 {
2991         struct rpc_clnt *clnt;
2992         struct rpc_task *task;
2993         int header = 0;
2994         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2995
2996         spin_lock(&sn->rpc_client_lock);
2997         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2998                 spin_lock(&clnt->cl_lock);
2999                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3000                         if (!header) {
3001                                 rpc_show_header();
3002                                 header++;
3003                         }
3004                         rpc_show_task(clnt, task);
3005                 }
3006                 spin_unlock(&clnt->cl_lock);
3007         }
3008         spin_unlock(&sn->rpc_client_lock);
3009 }
3010 #endif
3011
3012 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3013 static int
3014 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3015                 struct rpc_xprt *xprt,
3016                 void *dummy)
3017 {
3018         return xprt_enable_swap(xprt);
3019 }
3020
3021 int
3022 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3023 {
3024         if (atomic_inc_return(&clnt->cl_swapper) == 1)
3025                 return rpc_clnt_iterate_for_each_xprt(clnt,
3026                                 rpc_clnt_swap_activate_callback, NULL);
3027         return 0;
3028 }
3029 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3030
3031 static int
3032 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3033                 struct rpc_xprt *xprt,
3034                 void *dummy)
3035 {
3036         xprt_disable_swap(xprt);
3037         return 0;
3038 }
3039
3040 void
3041 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3042 {
3043         if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3044                 rpc_clnt_iterate_for_each_xprt(clnt,
3045                                 rpc_clnt_swap_deactivate_callback, NULL);
3046 }
3047 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3048 #endif /* CONFIG_SUNRPC_SWAP */