ASoC: SOF: Introduce generic (in)firmware tracing infrastructure
[linux-2.6-microblaze.git] / net / sunrpc / clnt.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  *  linux/net/sunrpc/clnt.c
4  *
5  *  This file contains the high-level RPC interface.
6  *  It is modeled as a finite state machine to support both synchronous
7  *  and asynchronous requests.
8  *
9  *  -   RPC header generation and argument serialization.
10  *  -   Credential refresh.
11  *  -   TCP connect handling.
12  *  -   Retry of operation when it is suspected the operation failed because
13  *      of uid squashing on the server, or when the credentials were stale
14  *      and need to be refreshed, or when a packet was damaged in transit.
15  *      This may be have to be moved to the VFS layer.
16  *
17  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19  */
20
21
22 #include <linux/module.h>
23 #include <linux/types.h>
24 #include <linux/kallsyms.h>
25 #include <linux/mm.h>
26 #include <linux/namei.h>
27 #include <linux/mount.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32 #include <linux/in.h>
33 #include <linux/in6.h>
34 #include <linux/un.h>
35
36 #include <linux/sunrpc/clnt.h>
37 #include <linux/sunrpc/addr.h>
38 #include <linux/sunrpc/rpc_pipe_fs.h>
39 #include <linux/sunrpc/metrics.h>
40 #include <linux/sunrpc/bc_xprt.h>
41 #include <trace/events/sunrpc.h>
42
43 #include "sunrpc.h"
44 #include "sysfs.h"
45 #include "netns.h"
46
47 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48 # define RPCDBG_FACILITY        RPCDBG_CALL
49 #endif
50
51 /*
52  * All RPC clients are linked into this list
53  */
54
55 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
56
57
58 static void     call_start(struct rpc_task *task);
59 static void     call_reserve(struct rpc_task *task);
60 static void     call_reserveresult(struct rpc_task *task);
61 static void     call_allocate(struct rpc_task *task);
62 static void     call_encode(struct rpc_task *task);
63 static void     call_decode(struct rpc_task *task);
64 static void     call_bind(struct rpc_task *task);
65 static void     call_bind_status(struct rpc_task *task);
66 static void     call_transmit(struct rpc_task *task);
67 static void     call_status(struct rpc_task *task);
68 static void     call_transmit_status(struct rpc_task *task);
69 static void     call_refresh(struct rpc_task *task);
70 static void     call_refreshresult(struct rpc_task *task);
71 static void     call_connect(struct rpc_task *task);
72 static void     call_connect_status(struct rpc_task *task);
73
74 static int      rpc_encode_header(struct rpc_task *task,
75                                   struct xdr_stream *xdr);
76 static int      rpc_decode_header(struct rpc_task *task,
77                                   struct xdr_stream *xdr);
78 static int      rpc_ping(struct rpc_clnt *clnt);
79 static void     rpc_check_timeout(struct rpc_task *task);
80
81 static void rpc_register_client(struct rpc_clnt *clnt)
82 {
83         struct net *net = rpc_net_ns(clnt);
84         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
85
86         spin_lock(&sn->rpc_client_lock);
87         list_add(&clnt->cl_clients, &sn->all_clients);
88         spin_unlock(&sn->rpc_client_lock);
89 }
90
91 static void rpc_unregister_client(struct rpc_clnt *clnt)
92 {
93         struct net *net = rpc_net_ns(clnt);
94         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
95
96         spin_lock(&sn->rpc_client_lock);
97         list_del(&clnt->cl_clients);
98         spin_unlock(&sn->rpc_client_lock);
99 }
100
101 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
102 {
103         rpc_remove_client_dir(clnt);
104 }
105
106 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
107 {
108         struct net *net = rpc_net_ns(clnt);
109         struct super_block *pipefs_sb;
110
111         pipefs_sb = rpc_get_sb_net(net);
112         if (pipefs_sb) {
113                 __rpc_clnt_remove_pipedir(clnt);
114                 rpc_put_sb_net(net);
115         }
116 }
117
118 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
119                                     struct rpc_clnt *clnt)
120 {
121         static uint32_t clntid;
122         const char *dir_name = clnt->cl_program->pipe_dir_name;
123         char name[15];
124         struct dentry *dir, *dentry;
125
126         dir = rpc_d_lookup_sb(sb, dir_name);
127         if (dir == NULL) {
128                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
129                 return dir;
130         }
131         for (;;) {
132                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
133                 name[sizeof(name) - 1] = '\0';
134                 dentry = rpc_create_client_dir(dir, name, clnt);
135                 if (!IS_ERR(dentry))
136                         break;
137                 if (dentry == ERR_PTR(-EEXIST))
138                         continue;
139                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
140                                 " %s/%s, error %ld\n",
141                                 dir_name, name, PTR_ERR(dentry));
142                 break;
143         }
144         dput(dir);
145         return dentry;
146 }
147
148 static int
149 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
150 {
151         struct dentry *dentry;
152
153         if (clnt->cl_program->pipe_dir_name != NULL) {
154                 dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
155                 if (IS_ERR(dentry))
156                         return PTR_ERR(dentry);
157         }
158         return 0;
159 }
160
161 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
162 {
163         if (clnt->cl_program->pipe_dir_name == NULL)
164                 return 1;
165
166         switch (event) {
167         case RPC_PIPEFS_MOUNT:
168                 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
169                         return 1;
170                 if (refcount_read(&clnt->cl_count) == 0)
171                         return 1;
172                 break;
173         case RPC_PIPEFS_UMOUNT:
174                 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
175                         return 1;
176                 break;
177         }
178         return 0;
179 }
180
181 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
182                                    struct super_block *sb)
183 {
184         struct dentry *dentry;
185
186         switch (event) {
187         case RPC_PIPEFS_MOUNT:
188                 dentry = rpc_setup_pipedir_sb(sb, clnt);
189                 if (!dentry)
190                         return -ENOENT;
191                 if (IS_ERR(dentry))
192                         return PTR_ERR(dentry);
193                 break;
194         case RPC_PIPEFS_UMOUNT:
195                 __rpc_clnt_remove_pipedir(clnt);
196                 break;
197         default:
198                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
199                 return -ENOTSUPP;
200         }
201         return 0;
202 }
203
204 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
205                                 struct super_block *sb)
206 {
207         int error = 0;
208
209         for (;; clnt = clnt->cl_parent) {
210                 if (!rpc_clnt_skip_event(clnt, event))
211                         error = __rpc_clnt_handle_event(clnt, event, sb);
212                 if (error || clnt == clnt->cl_parent)
213                         break;
214         }
215         return error;
216 }
217
218 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
219 {
220         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
221         struct rpc_clnt *clnt;
222
223         spin_lock(&sn->rpc_client_lock);
224         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
225                 if (rpc_clnt_skip_event(clnt, event))
226                         continue;
227                 spin_unlock(&sn->rpc_client_lock);
228                 return clnt;
229         }
230         spin_unlock(&sn->rpc_client_lock);
231         return NULL;
232 }
233
234 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
235                             void *ptr)
236 {
237         struct super_block *sb = ptr;
238         struct rpc_clnt *clnt;
239         int error = 0;
240
241         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
242                 error = __rpc_pipefs_event(clnt, event, sb);
243                 if (error)
244                         break;
245         }
246         return error;
247 }
248
249 static struct notifier_block rpc_clients_block = {
250         .notifier_call  = rpc_pipefs_event,
251         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
252 };
253
254 int rpc_clients_notifier_register(void)
255 {
256         return rpc_pipefs_notifier_register(&rpc_clients_block);
257 }
258
259 void rpc_clients_notifier_unregister(void)
260 {
261         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
262 }
263
264 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
265                 struct rpc_xprt *xprt,
266                 const struct rpc_timeout *timeout)
267 {
268         struct rpc_xprt *old;
269
270         spin_lock(&clnt->cl_lock);
271         old = rcu_dereference_protected(clnt->cl_xprt,
272                         lockdep_is_held(&clnt->cl_lock));
273
274         if (!xprt_bound(xprt))
275                 clnt->cl_autobind = 1;
276
277         clnt->cl_timeout = timeout;
278         rcu_assign_pointer(clnt->cl_xprt, xprt);
279         spin_unlock(&clnt->cl_lock);
280
281         return old;
282 }
283
284 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
285 {
286         clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
287                         nodename, sizeof(clnt->cl_nodename));
288 }
289
290 static int rpc_client_register(struct rpc_clnt *clnt,
291                                rpc_authflavor_t pseudoflavor,
292                                const char *client_name)
293 {
294         struct rpc_auth_create_args auth_args = {
295                 .pseudoflavor = pseudoflavor,
296                 .target_name = client_name,
297         };
298         struct rpc_auth *auth;
299         struct net *net = rpc_net_ns(clnt);
300         struct super_block *pipefs_sb;
301         int err;
302
303         rpc_clnt_debugfs_register(clnt);
304
305         pipefs_sb = rpc_get_sb_net(net);
306         if (pipefs_sb) {
307                 err = rpc_setup_pipedir(pipefs_sb, clnt);
308                 if (err)
309                         goto out;
310         }
311
312         rpc_register_client(clnt);
313         if (pipefs_sb)
314                 rpc_put_sb_net(net);
315
316         auth = rpcauth_create(&auth_args, clnt);
317         if (IS_ERR(auth)) {
318                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
319                                 pseudoflavor);
320                 err = PTR_ERR(auth);
321                 goto err_auth;
322         }
323         return 0;
324 err_auth:
325         pipefs_sb = rpc_get_sb_net(net);
326         rpc_unregister_client(clnt);
327         __rpc_clnt_remove_pipedir(clnt);
328 out:
329         if (pipefs_sb)
330                 rpc_put_sb_net(net);
331         rpc_sysfs_client_destroy(clnt);
332         rpc_clnt_debugfs_unregister(clnt);
333         return err;
334 }
335
336 static DEFINE_IDA(rpc_clids);
337
338 void rpc_cleanup_clids(void)
339 {
340         ida_destroy(&rpc_clids);
341 }
342
343 static int rpc_alloc_clid(struct rpc_clnt *clnt)
344 {
345         int clid;
346
347         clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
348         if (clid < 0)
349                 return clid;
350         clnt->cl_clid = clid;
351         return 0;
352 }
353
354 static void rpc_free_clid(struct rpc_clnt *clnt)
355 {
356         ida_simple_remove(&rpc_clids, clnt->cl_clid);
357 }
358
359 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
360                 struct rpc_xprt_switch *xps,
361                 struct rpc_xprt *xprt,
362                 struct rpc_clnt *parent)
363 {
364         const struct rpc_program *program = args->program;
365         const struct rpc_version *version;
366         struct rpc_clnt *clnt = NULL;
367         const struct rpc_timeout *timeout;
368         const char *nodename = args->nodename;
369         int err;
370
371         err = rpciod_up();
372         if (err)
373                 goto out_no_rpciod;
374
375         err = -EINVAL;
376         if (args->version >= program->nrvers)
377                 goto out_err;
378         version = program->version[args->version];
379         if (version == NULL)
380                 goto out_err;
381
382         err = -ENOMEM;
383         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
384         if (!clnt)
385                 goto out_err;
386         clnt->cl_parent = parent ? : clnt;
387
388         err = rpc_alloc_clid(clnt);
389         if (err)
390                 goto out_no_clid;
391
392         clnt->cl_cred     = get_cred(args->cred);
393         clnt->cl_procinfo = version->procs;
394         clnt->cl_maxproc  = version->nrprocs;
395         clnt->cl_prog     = args->prognumber ? : program->number;
396         clnt->cl_vers     = version->number;
397         clnt->cl_stats    = program->stats;
398         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
399         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
400         err = -ENOMEM;
401         if (clnt->cl_metrics == NULL)
402                 goto out_no_stats;
403         clnt->cl_program  = program;
404         INIT_LIST_HEAD(&clnt->cl_tasks);
405         spin_lock_init(&clnt->cl_lock);
406
407         timeout = xprt->timeout;
408         if (args->timeout != NULL) {
409                 memcpy(&clnt->cl_timeout_default, args->timeout,
410                                 sizeof(clnt->cl_timeout_default));
411                 timeout = &clnt->cl_timeout_default;
412         }
413
414         rpc_clnt_set_transport(clnt, xprt, timeout);
415         xprt->main = true;
416         xprt_iter_init(&clnt->cl_xpi, xps);
417         xprt_switch_put(xps);
418
419         clnt->cl_rtt = &clnt->cl_rtt_default;
420         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
421
422         refcount_set(&clnt->cl_count, 1);
423
424         if (nodename == NULL)
425                 nodename = utsname()->nodename;
426         /* save the nodename */
427         rpc_clnt_set_nodename(clnt, nodename);
428
429         rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
430         err = rpc_client_register(clnt, args->authflavor, args->client_name);
431         if (err)
432                 goto out_no_path;
433         if (parent)
434                 refcount_inc(&parent->cl_count);
435
436         trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
437         return clnt;
438
439 out_no_path:
440         rpc_free_iostats(clnt->cl_metrics);
441 out_no_stats:
442         put_cred(clnt->cl_cred);
443         rpc_free_clid(clnt);
444 out_no_clid:
445         kfree(clnt);
446 out_err:
447         rpciod_down();
448 out_no_rpciod:
449         xprt_switch_put(xps);
450         xprt_put(xprt);
451         trace_rpc_clnt_new_err(program->name, args->servername, err);
452         return ERR_PTR(err);
453 }
454
455 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
456                                         struct rpc_xprt *xprt)
457 {
458         struct rpc_clnt *clnt = NULL;
459         struct rpc_xprt_switch *xps;
460
461         if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
462                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
463                 xps = args->bc_xprt->xpt_bc_xps;
464                 xprt_switch_get(xps);
465         } else {
466                 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
467                 if (xps == NULL) {
468                         xprt_put(xprt);
469                         return ERR_PTR(-ENOMEM);
470                 }
471                 if (xprt->bc_xprt) {
472                         xprt_switch_get(xps);
473                         xprt->bc_xprt->xpt_bc_xps = xps;
474                 }
475         }
476         clnt = rpc_new_client(args, xps, xprt, NULL);
477         if (IS_ERR(clnt))
478                 return clnt;
479
480         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
481                 int err = rpc_ping(clnt);
482                 if (err != 0) {
483                         rpc_shutdown_client(clnt);
484                         return ERR_PTR(err);
485                 }
486         }
487
488         clnt->cl_softrtry = 1;
489         if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
490                 clnt->cl_softrtry = 0;
491                 if (args->flags & RPC_CLNT_CREATE_SOFTERR)
492                         clnt->cl_softerr = 1;
493         }
494
495         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
496                 clnt->cl_autobind = 1;
497         if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
498                 clnt->cl_noretranstimeo = 1;
499         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
500                 clnt->cl_discrtry = 1;
501         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
502                 clnt->cl_chatty = 1;
503
504         return clnt;
505 }
506
507 /**
508  * rpc_create - create an RPC client and transport with one call
509  * @args: rpc_clnt create argument structure
510  *
511  * Creates and initializes an RPC transport and an RPC client.
512  *
513  * It can ping the server in order to determine if it is up, and to see if
514  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
515  * this behavior so asynchronous tasks can also use rpc_create.
516  */
517 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
518 {
519         struct rpc_xprt *xprt;
520         struct xprt_create xprtargs = {
521                 .net = args->net,
522                 .ident = args->protocol,
523                 .srcaddr = args->saddress,
524                 .dstaddr = args->address,
525                 .addrlen = args->addrsize,
526                 .servername = args->servername,
527                 .bc_xprt = args->bc_xprt,
528         };
529         char servername[48];
530         struct rpc_clnt *clnt;
531         int i;
532
533         if (args->bc_xprt) {
534                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
535                 xprt = args->bc_xprt->xpt_bc_xprt;
536                 if (xprt) {
537                         xprt_get(xprt);
538                         return rpc_create_xprt(args, xprt);
539                 }
540         }
541
542         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
543                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
544         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
545                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
546         /*
547          * If the caller chooses not to specify a hostname, whip
548          * up a string representation of the passed-in address.
549          */
550         if (xprtargs.servername == NULL) {
551                 struct sockaddr_un *sun =
552                                 (struct sockaddr_un *)args->address;
553                 struct sockaddr_in *sin =
554                                 (struct sockaddr_in *)args->address;
555                 struct sockaddr_in6 *sin6 =
556                                 (struct sockaddr_in6 *)args->address;
557
558                 servername[0] = '\0';
559                 switch (args->address->sa_family) {
560                 case AF_LOCAL:
561                         snprintf(servername, sizeof(servername), "%s",
562                                  sun->sun_path);
563                         break;
564                 case AF_INET:
565                         snprintf(servername, sizeof(servername), "%pI4",
566                                  &sin->sin_addr.s_addr);
567                         break;
568                 case AF_INET6:
569                         snprintf(servername, sizeof(servername), "%pI6",
570                                  &sin6->sin6_addr);
571                         break;
572                 default:
573                         /* caller wants default server name, but
574                          * address family isn't recognized. */
575                         return ERR_PTR(-EINVAL);
576                 }
577                 xprtargs.servername = servername;
578         }
579
580         xprt = xprt_create_transport(&xprtargs);
581         if (IS_ERR(xprt))
582                 return (struct rpc_clnt *)xprt;
583
584         /*
585          * By default, kernel RPC client connects from a reserved port.
586          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
587          * but it is always enabled for rpciod, which handles the connect
588          * operation.
589          */
590         xprt->resvport = 1;
591         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
592                 xprt->resvport = 0;
593         xprt->reuseport = 0;
594         if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
595                 xprt->reuseport = 1;
596
597         clnt = rpc_create_xprt(args, xprt);
598         if (IS_ERR(clnt) || args->nconnect <= 1)
599                 return clnt;
600
601         for (i = 0; i < args->nconnect - 1; i++) {
602                 if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
603                         break;
604         }
605         return clnt;
606 }
607 EXPORT_SYMBOL_GPL(rpc_create);
608
609 /*
610  * This function clones the RPC client structure. It allows us to share the
611  * same transport while varying parameters such as the authentication
612  * flavour.
613  */
614 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
615                                            struct rpc_clnt *clnt)
616 {
617         struct rpc_xprt_switch *xps;
618         struct rpc_xprt *xprt;
619         struct rpc_clnt *new;
620         int err;
621
622         err = -ENOMEM;
623         rcu_read_lock();
624         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
625         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
626         rcu_read_unlock();
627         if (xprt == NULL || xps == NULL) {
628                 xprt_put(xprt);
629                 xprt_switch_put(xps);
630                 goto out_err;
631         }
632         args->servername = xprt->servername;
633         args->nodename = clnt->cl_nodename;
634
635         new = rpc_new_client(args, xps, xprt, clnt);
636         if (IS_ERR(new))
637                 return new;
638
639         /* Turn off autobind on clones */
640         new->cl_autobind = 0;
641         new->cl_softrtry = clnt->cl_softrtry;
642         new->cl_softerr = clnt->cl_softerr;
643         new->cl_noretranstimeo = clnt->cl_noretranstimeo;
644         new->cl_discrtry = clnt->cl_discrtry;
645         new->cl_chatty = clnt->cl_chatty;
646         new->cl_principal = clnt->cl_principal;
647         return new;
648
649 out_err:
650         trace_rpc_clnt_clone_err(clnt, err);
651         return ERR_PTR(err);
652 }
653
654 /**
655  * rpc_clone_client - Clone an RPC client structure
656  *
657  * @clnt: RPC client whose parameters are copied
658  *
659  * Returns a fresh RPC client or an ERR_PTR.
660  */
661 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
662 {
663         struct rpc_create_args args = {
664                 .program        = clnt->cl_program,
665                 .prognumber     = clnt->cl_prog,
666                 .version        = clnt->cl_vers,
667                 .authflavor     = clnt->cl_auth->au_flavor,
668                 .cred           = clnt->cl_cred,
669         };
670         return __rpc_clone_client(&args, clnt);
671 }
672 EXPORT_SYMBOL_GPL(rpc_clone_client);
673
674 /**
675  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
676  *
677  * @clnt: RPC client whose parameters are copied
678  * @flavor: security flavor for new client
679  *
680  * Returns a fresh RPC client or an ERR_PTR.
681  */
682 struct rpc_clnt *
683 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
684 {
685         struct rpc_create_args args = {
686                 .program        = clnt->cl_program,
687                 .prognumber     = clnt->cl_prog,
688                 .version        = clnt->cl_vers,
689                 .authflavor     = flavor,
690                 .cred           = clnt->cl_cred,
691         };
692         return __rpc_clone_client(&args, clnt);
693 }
694 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
695
696 /**
697  * rpc_switch_client_transport: switch the RPC transport on the fly
698  * @clnt: pointer to a struct rpc_clnt
699  * @args: pointer to the new transport arguments
700  * @timeout: pointer to the new timeout parameters
701  *
702  * This function allows the caller to switch the RPC transport for the
703  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
704  * server, for instance.  It assumes that the caller has ensured that
705  * there are no active RPC tasks by using some form of locking.
706  *
707  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
708  * negative errno is returned, and "clnt" continues to use the old
709  * xprt.
710  */
711 int rpc_switch_client_transport(struct rpc_clnt *clnt,
712                 struct xprt_create *args,
713                 const struct rpc_timeout *timeout)
714 {
715         const struct rpc_timeout *old_timeo;
716         rpc_authflavor_t pseudoflavor;
717         struct rpc_xprt_switch *xps, *oldxps;
718         struct rpc_xprt *xprt, *old;
719         struct rpc_clnt *parent;
720         int err;
721
722         xprt = xprt_create_transport(args);
723         if (IS_ERR(xprt))
724                 return PTR_ERR(xprt);
725
726         xps = xprt_switch_alloc(xprt, GFP_KERNEL);
727         if (xps == NULL) {
728                 xprt_put(xprt);
729                 return -ENOMEM;
730         }
731
732         pseudoflavor = clnt->cl_auth->au_flavor;
733
734         old_timeo = clnt->cl_timeout;
735         old = rpc_clnt_set_transport(clnt, xprt, timeout);
736         oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
737
738         rpc_unregister_client(clnt);
739         __rpc_clnt_remove_pipedir(clnt);
740         rpc_sysfs_client_destroy(clnt);
741         rpc_clnt_debugfs_unregister(clnt);
742
743         /*
744          * A new transport was created.  "clnt" therefore
745          * becomes the root of a new cl_parent tree.  clnt's
746          * children, if it has any, still point to the old xprt.
747          */
748         parent = clnt->cl_parent;
749         clnt->cl_parent = clnt;
750
751         /*
752          * The old rpc_auth cache cannot be re-used.  GSS
753          * contexts in particular are between a single
754          * client and server.
755          */
756         err = rpc_client_register(clnt, pseudoflavor, NULL);
757         if (err)
758                 goto out_revert;
759
760         synchronize_rcu();
761         if (parent != clnt)
762                 rpc_release_client(parent);
763         xprt_switch_put(oldxps);
764         xprt_put(old);
765         trace_rpc_clnt_replace_xprt(clnt);
766         return 0;
767
768 out_revert:
769         xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
770         rpc_clnt_set_transport(clnt, old, old_timeo);
771         clnt->cl_parent = parent;
772         rpc_client_register(clnt, pseudoflavor, NULL);
773         xprt_switch_put(xps);
774         xprt_put(xprt);
775         trace_rpc_clnt_replace_xprt_err(clnt);
776         return err;
777 }
778 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
779
780 static
781 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
782 {
783         struct rpc_xprt_switch *xps;
784
785         rcu_read_lock();
786         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
787         rcu_read_unlock();
788         if (xps == NULL)
789                 return -EAGAIN;
790         xprt_iter_init_listall(xpi, xps);
791         xprt_switch_put(xps);
792         return 0;
793 }
794
795 /**
796  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
797  * @clnt: pointer to client
798  * @fn: function to apply
799  * @data: void pointer to function data
800  *
801  * Iterates through the list of RPC transports currently attached to the
802  * client and applies the function fn(clnt, xprt, data).
803  *
804  * On error, the iteration stops, and the function returns the error value.
805  */
806 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
807                 int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
808                 void *data)
809 {
810         struct rpc_xprt_iter xpi;
811         int ret;
812
813         ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
814         if (ret)
815                 return ret;
816         for (;;) {
817                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
818
819                 if (!xprt)
820                         break;
821                 ret = fn(clnt, xprt, data);
822                 xprt_put(xprt);
823                 if (ret < 0)
824                         break;
825         }
826         xprt_iter_destroy(&xpi);
827         return ret;
828 }
829 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
830
831 /*
832  * Kill all tasks for the given client.
833  * XXX: kill their descendants as well?
834  */
835 void rpc_killall_tasks(struct rpc_clnt *clnt)
836 {
837         struct rpc_task *rovr;
838
839
840         if (list_empty(&clnt->cl_tasks))
841                 return;
842
843         /*
844          * Spin lock all_tasks to prevent changes...
845          */
846         trace_rpc_clnt_killall(clnt);
847         spin_lock(&clnt->cl_lock);
848         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
849                 rpc_signal_task(rovr);
850         spin_unlock(&clnt->cl_lock);
851 }
852 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
853
854 /*
855  * Properly shut down an RPC client, terminating all outstanding
856  * requests.
857  */
858 void rpc_shutdown_client(struct rpc_clnt *clnt)
859 {
860         might_sleep();
861
862         trace_rpc_clnt_shutdown(clnt);
863
864         while (!list_empty(&clnt->cl_tasks)) {
865                 rpc_killall_tasks(clnt);
866                 wait_event_timeout(destroy_wait,
867                         list_empty(&clnt->cl_tasks), 1*HZ);
868         }
869
870         rpc_release_client(clnt);
871 }
872 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
873
874 /*
875  * Free an RPC client
876  */
877 static void rpc_free_client_work(struct work_struct *work)
878 {
879         struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
880
881         trace_rpc_clnt_free(clnt);
882
883         /* These might block on processes that might allocate memory,
884          * so they cannot be called in rpciod, so they are handled separately
885          * here.
886          */
887         rpc_sysfs_client_destroy(clnt);
888         rpc_clnt_debugfs_unregister(clnt);
889         rpc_free_clid(clnt);
890         rpc_clnt_remove_pipedir(clnt);
891         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
892
893         kfree(clnt);
894         rpciod_down();
895 }
896 static struct rpc_clnt *
897 rpc_free_client(struct rpc_clnt *clnt)
898 {
899         struct rpc_clnt *parent = NULL;
900
901         trace_rpc_clnt_release(clnt);
902         if (clnt->cl_parent != clnt)
903                 parent = clnt->cl_parent;
904         rpc_unregister_client(clnt);
905         rpc_free_iostats(clnt->cl_metrics);
906         clnt->cl_metrics = NULL;
907         xprt_iter_destroy(&clnt->cl_xpi);
908         put_cred(clnt->cl_cred);
909
910         INIT_WORK(&clnt->cl_work, rpc_free_client_work);
911         schedule_work(&clnt->cl_work);
912         return parent;
913 }
914
915 /*
916  * Free an RPC client
917  */
918 static struct rpc_clnt *
919 rpc_free_auth(struct rpc_clnt *clnt)
920 {
921         /*
922          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
923          *       release remaining GSS contexts. This mechanism ensures
924          *       that it can do so safely.
925          */
926         if (clnt->cl_auth != NULL) {
927                 rpcauth_release(clnt->cl_auth);
928                 clnt->cl_auth = NULL;
929         }
930         if (refcount_dec_and_test(&clnt->cl_count))
931                 return rpc_free_client(clnt);
932         return NULL;
933 }
934
935 /*
936  * Release reference to the RPC client
937  */
938 void
939 rpc_release_client(struct rpc_clnt *clnt)
940 {
941         do {
942                 if (list_empty(&clnt->cl_tasks))
943                         wake_up(&destroy_wait);
944                 if (refcount_dec_not_one(&clnt->cl_count))
945                         break;
946                 clnt = rpc_free_auth(clnt);
947         } while (clnt != NULL);
948 }
949 EXPORT_SYMBOL_GPL(rpc_release_client);
950
951 /**
952  * rpc_bind_new_program - bind a new RPC program to an existing client
953  * @old: old rpc_client
954  * @program: rpc program to set
955  * @vers: rpc program version
956  *
957  * Clones the rpc client and sets up a new RPC program. This is mainly
958  * of use for enabling different RPC programs to share the same transport.
959  * The Sun NFSv2/v3 ACL protocol can do this.
960  */
961 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
962                                       const struct rpc_program *program,
963                                       u32 vers)
964 {
965         struct rpc_create_args args = {
966                 .program        = program,
967                 .prognumber     = program->number,
968                 .version        = vers,
969                 .authflavor     = old->cl_auth->au_flavor,
970                 .cred           = old->cl_cred,
971         };
972         struct rpc_clnt *clnt;
973         int err;
974
975         clnt = __rpc_clone_client(&args, old);
976         if (IS_ERR(clnt))
977                 goto out;
978         err = rpc_ping(clnt);
979         if (err != 0) {
980                 rpc_shutdown_client(clnt);
981                 clnt = ERR_PTR(err);
982         }
983 out:
984         return clnt;
985 }
986 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
987
988 struct rpc_xprt *
989 rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
990 {
991         struct rpc_xprt_switch *xps;
992
993         if (!xprt)
994                 return NULL;
995         rcu_read_lock();
996         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
997         atomic_long_inc(&xps->xps_queuelen);
998         rcu_read_unlock();
999         atomic_long_inc(&xprt->queuelen);
1000
1001         return xprt;
1002 }
1003
1004 static void
1005 rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1006 {
1007         struct rpc_xprt_switch *xps;
1008
1009         atomic_long_dec(&xprt->queuelen);
1010         rcu_read_lock();
1011         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1012         atomic_long_dec(&xps->xps_queuelen);
1013         rcu_read_unlock();
1014
1015         xprt_put(xprt);
1016 }
1017
1018 void rpc_task_release_transport(struct rpc_task *task)
1019 {
1020         struct rpc_xprt *xprt = task->tk_xprt;
1021
1022         if (xprt) {
1023                 task->tk_xprt = NULL;
1024                 if (task->tk_client)
1025                         rpc_task_release_xprt(task->tk_client, xprt);
1026                 else
1027                         xprt_put(xprt);
1028         }
1029 }
1030 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1031
1032 void rpc_task_release_client(struct rpc_task *task)
1033 {
1034         struct rpc_clnt *clnt = task->tk_client;
1035
1036         rpc_task_release_transport(task);
1037         if (clnt != NULL) {
1038                 /* Remove from client task list */
1039                 spin_lock(&clnt->cl_lock);
1040                 list_del(&task->tk_task);
1041                 spin_unlock(&clnt->cl_lock);
1042                 task->tk_client = NULL;
1043
1044                 rpc_release_client(clnt);
1045         }
1046 }
1047
1048 static struct rpc_xprt *
1049 rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1050 {
1051         struct rpc_xprt *xprt;
1052
1053         rcu_read_lock();
1054         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1055         rcu_read_unlock();
1056         return rpc_task_get_xprt(clnt, xprt);
1057 }
1058
1059 static struct rpc_xprt *
1060 rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1061 {
1062         return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1063 }
1064
1065 static
1066 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1067 {
1068         if (task->tk_xprt &&
1069                         !(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1070                         (task->tk_flags & RPC_TASK_MOVEABLE)))
1071                 return;
1072         if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1073                 task->tk_xprt = rpc_task_get_first_xprt(clnt);
1074         else
1075                 task->tk_xprt = rpc_task_get_next_xprt(clnt);
1076 }
1077
1078 static
1079 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1080 {
1081         rpc_task_set_transport(task, clnt);
1082         task->tk_client = clnt;
1083         refcount_inc(&clnt->cl_count);
1084         if (clnt->cl_softrtry)
1085                 task->tk_flags |= RPC_TASK_SOFT;
1086         if (clnt->cl_softerr)
1087                 task->tk_flags |= RPC_TASK_TIMEOUT;
1088         if (clnt->cl_noretranstimeo)
1089                 task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1090         /* Add to the client's list of all tasks */
1091         spin_lock(&clnt->cl_lock);
1092         list_add_tail(&task->tk_task, &clnt->cl_tasks);
1093         spin_unlock(&clnt->cl_lock);
1094 }
1095
1096 static void
1097 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1098 {
1099         if (msg != NULL) {
1100                 task->tk_msg.rpc_proc = msg->rpc_proc;
1101                 task->tk_msg.rpc_argp = msg->rpc_argp;
1102                 task->tk_msg.rpc_resp = msg->rpc_resp;
1103                 task->tk_msg.rpc_cred = msg->rpc_cred;
1104                 if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1105                         get_cred(task->tk_msg.rpc_cred);
1106         }
1107 }
1108
1109 /*
1110  * Default callback for async RPC calls
1111  */
1112 static void
1113 rpc_default_callback(struct rpc_task *task, void *data)
1114 {
1115 }
1116
1117 static const struct rpc_call_ops rpc_default_ops = {
1118         .rpc_call_done = rpc_default_callback,
1119 };
1120
1121 /**
1122  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1123  * @task_setup_data: pointer to task initialisation data
1124  */
1125 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1126 {
1127         struct rpc_task *task;
1128
1129         task = rpc_new_task(task_setup_data);
1130
1131         if (!RPC_IS_ASYNC(task))
1132                 task->tk_flags |= RPC_TASK_CRED_NOREF;
1133
1134         rpc_task_set_client(task, task_setup_data->rpc_client);
1135         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1136
1137         if (task->tk_action == NULL)
1138                 rpc_call_start(task);
1139
1140         atomic_inc(&task->tk_count);
1141         rpc_execute(task);
1142         return task;
1143 }
1144 EXPORT_SYMBOL_GPL(rpc_run_task);
1145
1146 /**
1147  * rpc_call_sync - Perform a synchronous RPC call
1148  * @clnt: pointer to RPC client
1149  * @msg: RPC call parameters
1150  * @flags: RPC call flags
1151  */
1152 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1153 {
1154         struct rpc_task *task;
1155         struct rpc_task_setup task_setup_data = {
1156                 .rpc_client = clnt,
1157                 .rpc_message = msg,
1158                 .callback_ops = &rpc_default_ops,
1159                 .flags = flags,
1160         };
1161         int status;
1162
1163         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1164         if (flags & RPC_TASK_ASYNC) {
1165                 rpc_release_calldata(task_setup_data.callback_ops,
1166                         task_setup_data.callback_data);
1167                 return -EINVAL;
1168         }
1169
1170         task = rpc_run_task(&task_setup_data);
1171         if (IS_ERR(task))
1172                 return PTR_ERR(task);
1173         status = task->tk_status;
1174         rpc_put_task(task);
1175         return status;
1176 }
1177 EXPORT_SYMBOL_GPL(rpc_call_sync);
1178
1179 /**
1180  * rpc_call_async - Perform an asynchronous RPC call
1181  * @clnt: pointer to RPC client
1182  * @msg: RPC call parameters
1183  * @flags: RPC call flags
1184  * @tk_ops: RPC call ops
1185  * @data: user call data
1186  */
1187 int
1188 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1189                const struct rpc_call_ops *tk_ops, void *data)
1190 {
1191         struct rpc_task *task;
1192         struct rpc_task_setup task_setup_data = {
1193                 .rpc_client = clnt,
1194                 .rpc_message = msg,
1195                 .callback_ops = tk_ops,
1196                 .callback_data = data,
1197                 .flags = flags|RPC_TASK_ASYNC,
1198         };
1199
1200         task = rpc_run_task(&task_setup_data);
1201         if (IS_ERR(task))
1202                 return PTR_ERR(task);
1203         rpc_put_task(task);
1204         return 0;
1205 }
1206 EXPORT_SYMBOL_GPL(rpc_call_async);
1207
1208 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1209 static void call_bc_encode(struct rpc_task *task);
1210
1211 /**
1212  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1213  * rpc_execute against it
1214  * @req: RPC request
1215  */
1216 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1217 {
1218         struct rpc_task *task;
1219         struct rpc_task_setup task_setup_data = {
1220                 .callback_ops = &rpc_default_ops,
1221                 .flags = RPC_TASK_SOFTCONN |
1222                         RPC_TASK_NO_RETRANS_TIMEOUT,
1223         };
1224
1225         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1226         /*
1227          * Create an rpc_task to send the data
1228          */
1229         task = rpc_new_task(&task_setup_data);
1230         xprt_init_bc_request(req, task);
1231
1232         task->tk_action = call_bc_encode;
1233         atomic_inc(&task->tk_count);
1234         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1235         rpc_execute(task);
1236
1237         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1238         return task;
1239 }
1240 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1241
1242 /**
1243  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1244  * @req: RPC request to prepare
1245  * @pages: vector of struct page pointers
1246  * @base: offset in first page where receive should start, in bytes
1247  * @len: expected size of the upper layer data payload, in bytes
1248  * @hdrsize: expected size of upper layer reply header, in XDR words
1249  *
1250  */
1251 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1252                              unsigned int base, unsigned int len,
1253                              unsigned int hdrsize)
1254 {
1255         hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1256
1257         xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1258         trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1259 }
1260 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1261
1262 void
1263 rpc_call_start(struct rpc_task *task)
1264 {
1265         task->tk_action = call_start;
1266 }
1267 EXPORT_SYMBOL_GPL(rpc_call_start);
1268
1269 /**
1270  * rpc_peeraddr - extract remote peer address from clnt's xprt
1271  * @clnt: RPC client structure
1272  * @buf: target buffer
1273  * @bufsize: length of target buffer
1274  *
1275  * Returns the number of bytes that are actually in the stored address.
1276  */
1277 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1278 {
1279         size_t bytes;
1280         struct rpc_xprt *xprt;
1281
1282         rcu_read_lock();
1283         xprt = rcu_dereference(clnt->cl_xprt);
1284
1285         bytes = xprt->addrlen;
1286         if (bytes > bufsize)
1287                 bytes = bufsize;
1288         memcpy(buf, &xprt->addr, bytes);
1289         rcu_read_unlock();
1290
1291         return bytes;
1292 }
1293 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1294
1295 /**
1296  * rpc_peeraddr2str - return remote peer address in printable format
1297  * @clnt: RPC client structure
1298  * @format: address format
1299  *
1300  * NB: the lifetime of the memory referenced by the returned pointer is
1301  * the same as the rpc_xprt itself.  As long as the caller uses this
1302  * pointer, it must hold the RCU read lock.
1303  */
1304 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1305                              enum rpc_display_format_t format)
1306 {
1307         struct rpc_xprt *xprt;
1308
1309         xprt = rcu_dereference(clnt->cl_xprt);
1310
1311         if (xprt->address_strings[format] != NULL)
1312                 return xprt->address_strings[format];
1313         else
1314                 return "unprintable";
1315 }
1316 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1317
1318 static const struct sockaddr_in rpc_inaddr_loopback = {
1319         .sin_family             = AF_INET,
1320         .sin_addr.s_addr        = htonl(INADDR_ANY),
1321 };
1322
1323 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1324         .sin6_family            = AF_INET6,
1325         .sin6_addr              = IN6ADDR_ANY_INIT,
1326 };
1327
1328 /*
1329  * Try a getsockname() on a connected datagram socket.  Using a
1330  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1331  * This conserves the ephemeral port number space.
1332  *
1333  * Returns zero and fills in "buf" if successful; otherwise, a
1334  * negative errno is returned.
1335  */
1336 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1337                         struct sockaddr *buf)
1338 {
1339         struct socket *sock;
1340         int err;
1341
1342         err = __sock_create(net, sap->sa_family,
1343                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1344         if (err < 0) {
1345                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1346                 goto out;
1347         }
1348
1349         switch (sap->sa_family) {
1350         case AF_INET:
1351                 err = kernel_bind(sock,
1352                                 (struct sockaddr *)&rpc_inaddr_loopback,
1353                                 sizeof(rpc_inaddr_loopback));
1354                 break;
1355         case AF_INET6:
1356                 err = kernel_bind(sock,
1357                                 (struct sockaddr *)&rpc_in6addr_loopback,
1358                                 sizeof(rpc_in6addr_loopback));
1359                 break;
1360         default:
1361                 err = -EAFNOSUPPORT;
1362                 goto out;
1363         }
1364         if (err < 0) {
1365                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1366                 goto out_release;
1367         }
1368
1369         err = kernel_connect(sock, sap, salen, 0);
1370         if (err < 0) {
1371                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1372                 goto out_release;
1373         }
1374
1375         err = kernel_getsockname(sock, buf);
1376         if (err < 0) {
1377                 dprintk("RPC:       getsockname failed (%d)\n", err);
1378                 goto out_release;
1379         }
1380
1381         err = 0;
1382         if (buf->sa_family == AF_INET6) {
1383                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1384                 sin6->sin6_scope_id = 0;
1385         }
1386         dprintk("RPC:       %s succeeded\n", __func__);
1387
1388 out_release:
1389         sock_release(sock);
1390 out:
1391         return err;
1392 }
1393
1394 /*
1395  * Scraping a connected socket failed, so we don't have a useable
1396  * local address.  Fallback: generate an address that will prevent
1397  * the server from calling us back.
1398  *
1399  * Returns zero and fills in "buf" if successful; otherwise, a
1400  * negative errno is returned.
1401  */
1402 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1403 {
1404         switch (family) {
1405         case AF_INET:
1406                 if (buflen < sizeof(rpc_inaddr_loopback))
1407                         return -EINVAL;
1408                 memcpy(buf, &rpc_inaddr_loopback,
1409                                 sizeof(rpc_inaddr_loopback));
1410                 break;
1411         case AF_INET6:
1412                 if (buflen < sizeof(rpc_in6addr_loopback))
1413                         return -EINVAL;
1414                 memcpy(buf, &rpc_in6addr_loopback,
1415                                 sizeof(rpc_in6addr_loopback));
1416                 break;
1417         default:
1418                 dprintk("RPC:       %s: address family not supported\n",
1419                         __func__);
1420                 return -EAFNOSUPPORT;
1421         }
1422         dprintk("RPC:       %s: succeeded\n", __func__);
1423         return 0;
1424 }
1425
1426 /**
1427  * rpc_localaddr - discover local endpoint address for an RPC client
1428  * @clnt: RPC client structure
1429  * @buf: target buffer
1430  * @buflen: size of target buffer, in bytes
1431  *
1432  * Returns zero and fills in "buf" and "buflen" if successful;
1433  * otherwise, a negative errno is returned.
1434  *
1435  * This works even if the underlying transport is not currently connected,
1436  * or if the upper layer never previously provided a source address.
1437  *
1438  * The result of this function call is transient: multiple calls in
1439  * succession may give different results, depending on how local
1440  * networking configuration changes over time.
1441  */
1442 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1443 {
1444         struct sockaddr_storage address;
1445         struct sockaddr *sap = (struct sockaddr *)&address;
1446         struct rpc_xprt *xprt;
1447         struct net *net;
1448         size_t salen;
1449         int err;
1450
1451         rcu_read_lock();
1452         xprt = rcu_dereference(clnt->cl_xprt);
1453         salen = xprt->addrlen;
1454         memcpy(sap, &xprt->addr, salen);
1455         net = get_net(xprt->xprt_net);
1456         rcu_read_unlock();
1457
1458         rpc_set_port(sap, 0);
1459         err = rpc_sockname(net, sap, salen, buf);
1460         put_net(net);
1461         if (err != 0)
1462                 /* Couldn't discover local address, return ANYADDR */
1463                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1464         return 0;
1465 }
1466 EXPORT_SYMBOL_GPL(rpc_localaddr);
1467
1468 void
1469 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1470 {
1471         struct rpc_xprt *xprt;
1472
1473         rcu_read_lock();
1474         xprt = rcu_dereference(clnt->cl_xprt);
1475         if (xprt->ops->set_buffer_size)
1476                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1477         rcu_read_unlock();
1478 }
1479 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1480
1481 /**
1482  * rpc_net_ns - Get the network namespace for this RPC client
1483  * @clnt: RPC client to query
1484  *
1485  */
1486 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1487 {
1488         struct net *ret;
1489
1490         rcu_read_lock();
1491         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1492         rcu_read_unlock();
1493         return ret;
1494 }
1495 EXPORT_SYMBOL_GPL(rpc_net_ns);
1496
1497 /**
1498  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1499  * @clnt: RPC client to query
1500  *
1501  * For stream transports, this is one RPC record fragment (see RFC
1502  * 1831), as we don't support multi-record requests yet.  For datagram
1503  * transports, this is the size of an IP packet minus the IP, UDP, and
1504  * RPC header sizes.
1505  */
1506 size_t rpc_max_payload(struct rpc_clnt *clnt)
1507 {
1508         size_t ret;
1509
1510         rcu_read_lock();
1511         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1512         rcu_read_unlock();
1513         return ret;
1514 }
1515 EXPORT_SYMBOL_GPL(rpc_max_payload);
1516
1517 /**
1518  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1519  * @clnt: RPC client to query
1520  */
1521 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1522 {
1523         struct rpc_xprt *xprt;
1524         size_t ret;
1525
1526         rcu_read_lock();
1527         xprt = rcu_dereference(clnt->cl_xprt);
1528         ret = xprt->ops->bc_maxpayload(xprt);
1529         rcu_read_unlock();
1530         return ret;
1531 }
1532 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1533
1534 unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1535 {
1536         struct rpc_xprt *xprt;
1537         unsigned int ret;
1538
1539         rcu_read_lock();
1540         xprt = rcu_dereference(clnt->cl_xprt);
1541         ret = xprt->ops->bc_num_slots(xprt);
1542         rcu_read_unlock();
1543         return ret;
1544 }
1545 EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1546
1547 /**
1548  * rpc_force_rebind - force transport to check that remote port is unchanged
1549  * @clnt: client to rebind
1550  *
1551  */
1552 void rpc_force_rebind(struct rpc_clnt *clnt)
1553 {
1554         if (clnt->cl_autobind) {
1555                 rcu_read_lock();
1556                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1557                 rcu_read_unlock();
1558         }
1559 }
1560 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1561
1562 static int
1563 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1564 {
1565         task->tk_status = 0;
1566         task->tk_rpc_status = 0;
1567         task->tk_action = action;
1568         return 1;
1569 }
1570
1571 /*
1572  * Restart an (async) RPC call. Usually called from within the
1573  * exit handler.
1574  */
1575 int
1576 rpc_restart_call(struct rpc_task *task)
1577 {
1578         return __rpc_restart_call(task, call_start);
1579 }
1580 EXPORT_SYMBOL_GPL(rpc_restart_call);
1581
1582 /*
1583  * Restart an (async) RPC call from the call_prepare state.
1584  * Usually called from within the exit handler.
1585  */
1586 int
1587 rpc_restart_call_prepare(struct rpc_task *task)
1588 {
1589         if (task->tk_ops->rpc_call_prepare != NULL)
1590                 return __rpc_restart_call(task, rpc_prepare_task);
1591         return rpc_restart_call(task);
1592 }
1593 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1594
1595 const char
1596 *rpc_proc_name(const struct rpc_task *task)
1597 {
1598         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1599
1600         if (proc) {
1601                 if (proc->p_name)
1602                         return proc->p_name;
1603                 else
1604                         return "NULL";
1605         } else
1606                 return "no proc";
1607 }
1608
1609 static void
1610 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1611 {
1612         trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1613         task->tk_rpc_status = rpc_status;
1614         rpc_exit(task, tk_status);
1615 }
1616
1617 static void
1618 rpc_call_rpcerror(struct rpc_task *task, int status)
1619 {
1620         __rpc_call_rpcerror(task, status, status);
1621 }
1622
1623 /*
1624  * 0.  Initial state
1625  *
1626  *     Other FSM states can be visited zero or more times, but
1627  *     this state is visited exactly once for each RPC.
1628  */
1629 static void
1630 call_start(struct rpc_task *task)
1631 {
1632         struct rpc_clnt *clnt = task->tk_client;
1633         int idx = task->tk_msg.rpc_proc->p_statidx;
1634
1635         trace_rpc_request(task);
1636
1637         /* Increment call count (version might not be valid for ping) */
1638         if (clnt->cl_program->version[clnt->cl_vers])
1639                 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1640         clnt->cl_stats->rpccnt++;
1641         task->tk_action = call_reserve;
1642         rpc_task_set_transport(task, clnt);
1643 }
1644
1645 /*
1646  * 1.   Reserve an RPC call slot
1647  */
1648 static void
1649 call_reserve(struct rpc_task *task)
1650 {
1651         task->tk_status  = 0;
1652         task->tk_action  = call_reserveresult;
1653         xprt_reserve(task);
1654 }
1655
1656 static void call_retry_reserve(struct rpc_task *task);
1657
1658 /*
1659  * 1b.  Grok the result of xprt_reserve()
1660  */
1661 static void
1662 call_reserveresult(struct rpc_task *task)
1663 {
1664         int status = task->tk_status;
1665
1666         /*
1667          * After a call to xprt_reserve(), we must have either
1668          * a request slot or else an error status.
1669          */
1670         task->tk_status = 0;
1671         if (status >= 0) {
1672                 if (task->tk_rqstp) {
1673                         task->tk_action = call_refresh;
1674                         return;
1675                 }
1676
1677                 rpc_call_rpcerror(task, -EIO);
1678                 return;
1679         }
1680
1681         switch (status) {
1682         case -ENOMEM:
1683                 rpc_delay(task, HZ >> 2);
1684                 fallthrough;
1685         case -EAGAIN:   /* woken up; retry */
1686                 task->tk_action = call_retry_reserve;
1687                 return;
1688         default:
1689                 rpc_call_rpcerror(task, status);
1690         }
1691 }
1692
1693 /*
1694  * 1c.  Retry reserving an RPC call slot
1695  */
1696 static void
1697 call_retry_reserve(struct rpc_task *task)
1698 {
1699         task->tk_status  = 0;
1700         task->tk_action  = call_reserveresult;
1701         xprt_retry_reserve(task);
1702 }
1703
1704 /*
1705  * 2.   Bind and/or refresh the credentials
1706  */
1707 static void
1708 call_refresh(struct rpc_task *task)
1709 {
1710         task->tk_action = call_refreshresult;
1711         task->tk_status = 0;
1712         task->tk_client->cl_stats->rpcauthrefresh++;
1713         rpcauth_refreshcred(task);
1714 }
1715
1716 /*
1717  * 2a.  Process the results of a credential refresh
1718  */
1719 static void
1720 call_refreshresult(struct rpc_task *task)
1721 {
1722         int status = task->tk_status;
1723
1724         task->tk_status = 0;
1725         task->tk_action = call_refresh;
1726         switch (status) {
1727         case 0:
1728                 if (rpcauth_uptodatecred(task)) {
1729                         task->tk_action = call_allocate;
1730                         return;
1731                 }
1732                 /* Use rate-limiting and a max number of retries if refresh
1733                  * had status 0 but failed to update the cred.
1734                  */
1735                 fallthrough;
1736         case -ETIMEDOUT:
1737                 rpc_delay(task, 3*HZ);
1738                 fallthrough;
1739         case -EAGAIN:
1740                 status = -EACCES;
1741                 fallthrough;
1742         case -EKEYEXPIRED:
1743                 if (!task->tk_cred_retry)
1744                         break;
1745                 task->tk_cred_retry--;
1746                 trace_rpc_retry_refresh_status(task);
1747                 return;
1748         case -ENOMEM:
1749                 rpc_delay(task, HZ >> 4);
1750                 return;
1751         }
1752         trace_rpc_refresh_status(task);
1753         rpc_call_rpcerror(task, status);
1754 }
1755
1756 /*
1757  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1758  *      (Note: buffer memory is freed in xprt_release).
1759  */
1760 static void
1761 call_allocate(struct rpc_task *task)
1762 {
1763         const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1764         struct rpc_rqst *req = task->tk_rqstp;
1765         struct rpc_xprt *xprt = req->rq_xprt;
1766         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1767         int status;
1768
1769         task->tk_status = 0;
1770         task->tk_action = call_encode;
1771
1772         if (req->rq_buffer)
1773                 return;
1774
1775         if (proc->p_proc != 0) {
1776                 BUG_ON(proc->p_arglen == 0);
1777                 if (proc->p_decode != NULL)
1778                         BUG_ON(proc->p_replen == 0);
1779         }
1780
1781         /*
1782          * Calculate the size (in quads) of the RPC call
1783          * and reply headers, and convert both values
1784          * to byte sizes.
1785          */
1786         req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1787                            proc->p_arglen;
1788         req->rq_callsize <<= 2;
1789         /*
1790          * Note: the reply buffer must at minimum allocate enough space
1791          * for the 'struct accepted_reply' from RFC5531.
1792          */
1793         req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1794                         max_t(size_t, proc->p_replen, 2);
1795         req->rq_rcvsize <<= 2;
1796
1797         status = xprt->ops->buf_alloc(task);
1798         trace_rpc_buf_alloc(task, status);
1799         if (status == 0)
1800                 return;
1801         if (status != -ENOMEM) {
1802                 rpc_call_rpcerror(task, status);
1803                 return;
1804         }
1805
1806         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1807                 task->tk_action = call_allocate;
1808                 rpc_delay(task, HZ>>4);
1809                 return;
1810         }
1811
1812         rpc_call_rpcerror(task, -ERESTARTSYS);
1813 }
1814
1815 static int
1816 rpc_task_need_encode(struct rpc_task *task)
1817 {
1818         return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1819                 (!(task->tk_flags & RPC_TASK_SENT) ||
1820                  !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1821                  xprt_request_need_retransmit(task));
1822 }
1823
1824 static void
1825 rpc_xdr_encode(struct rpc_task *task)
1826 {
1827         struct rpc_rqst *req = task->tk_rqstp;
1828         struct xdr_stream xdr;
1829
1830         xdr_buf_init(&req->rq_snd_buf,
1831                      req->rq_buffer,
1832                      req->rq_callsize);
1833         xdr_buf_init(&req->rq_rcv_buf,
1834                      req->rq_rbuffer,
1835                      req->rq_rcvsize);
1836
1837         req->rq_reply_bytes_recvd = 0;
1838         req->rq_snd_buf.head[0].iov_len = 0;
1839         xdr_init_encode(&xdr, &req->rq_snd_buf,
1840                         req->rq_snd_buf.head[0].iov_base, req);
1841         xdr_free_bvec(&req->rq_snd_buf);
1842         if (rpc_encode_header(task, &xdr))
1843                 return;
1844
1845         task->tk_status = rpcauth_wrap_req(task, &xdr);
1846 }
1847
1848 /*
1849  * 3.   Encode arguments of an RPC call
1850  */
1851 static void
1852 call_encode(struct rpc_task *task)
1853 {
1854         if (!rpc_task_need_encode(task))
1855                 goto out;
1856
1857         /* Dequeue task from the receive queue while we're encoding */
1858         xprt_request_dequeue_xprt(task);
1859         /* Encode here so that rpcsec_gss can use correct sequence number. */
1860         rpc_xdr_encode(task);
1861         /* Did the encode result in an error condition? */
1862         if (task->tk_status != 0) {
1863                 /* Was the error nonfatal? */
1864                 switch (task->tk_status) {
1865                 case -EAGAIN:
1866                 case -ENOMEM:
1867                         rpc_delay(task, HZ >> 4);
1868                         break;
1869                 case -EKEYEXPIRED:
1870                         if (!task->tk_cred_retry) {
1871                                 rpc_exit(task, task->tk_status);
1872                         } else {
1873                                 task->tk_action = call_refresh;
1874                                 task->tk_cred_retry--;
1875                                 trace_rpc_retry_refresh_status(task);
1876                         }
1877                         break;
1878                 default:
1879                         rpc_call_rpcerror(task, task->tk_status);
1880                 }
1881                 return;
1882         }
1883
1884         /* Add task to reply queue before transmission to avoid races */
1885         if (rpc_reply_expected(task))
1886                 xprt_request_enqueue_receive(task);
1887         xprt_request_enqueue_transmit(task);
1888 out:
1889         task->tk_action = call_transmit;
1890         /* Check that the connection is OK */
1891         if (!xprt_bound(task->tk_xprt))
1892                 task->tk_action = call_bind;
1893         else if (!xprt_connected(task->tk_xprt))
1894                 task->tk_action = call_connect;
1895 }
1896
1897 /*
1898  * Helpers to check if the task was already transmitted, and
1899  * to take action when that is the case.
1900  */
1901 static bool
1902 rpc_task_transmitted(struct rpc_task *task)
1903 {
1904         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1905 }
1906
1907 static void
1908 rpc_task_handle_transmitted(struct rpc_task *task)
1909 {
1910         xprt_end_transmit(task);
1911         task->tk_action = call_transmit_status;
1912 }
1913
1914 /*
1915  * 4.   Get the server port number if not yet set
1916  */
1917 static void
1918 call_bind(struct rpc_task *task)
1919 {
1920         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1921
1922         if (rpc_task_transmitted(task)) {
1923                 rpc_task_handle_transmitted(task);
1924                 return;
1925         }
1926
1927         if (xprt_bound(xprt)) {
1928                 task->tk_action = call_connect;
1929                 return;
1930         }
1931
1932         task->tk_action = call_bind_status;
1933         if (!xprt_prepare_transmit(task))
1934                 return;
1935
1936         xprt->ops->rpcbind(task);
1937 }
1938
1939 /*
1940  * 4a.  Sort out bind result
1941  */
1942 static void
1943 call_bind_status(struct rpc_task *task)
1944 {
1945         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1946         int status = -EIO;
1947
1948         if (rpc_task_transmitted(task)) {
1949                 rpc_task_handle_transmitted(task);
1950                 return;
1951         }
1952
1953         if (task->tk_status >= 0)
1954                 goto out_next;
1955         if (xprt_bound(xprt)) {
1956                 task->tk_status = 0;
1957                 goto out_next;
1958         }
1959
1960         switch (task->tk_status) {
1961         case -ENOMEM:
1962                 rpc_delay(task, HZ >> 2);
1963                 goto retry_timeout;
1964         case -EACCES:
1965                 trace_rpcb_prog_unavail_err(task);
1966                 /* fail immediately if this is an RPC ping */
1967                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1968                         status = -EOPNOTSUPP;
1969                         break;
1970                 }
1971                 if (task->tk_rebind_retry == 0)
1972                         break;
1973                 task->tk_rebind_retry--;
1974                 rpc_delay(task, 3*HZ);
1975                 goto retry_timeout;
1976         case -ENOBUFS:
1977                 rpc_delay(task, HZ >> 2);
1978                 goto retry_timeout;
1979         case -EAGAIN:
1980                 goto retry_timeout;
1981         case -ETIMEDOUT:
1982                 trace_rpcb_timeout_err(task);
1983                 goto retry_timeout;
1984         case -EPFNOSUPPORT:
1985                 /* server doesn't support any rpcbind version we know of */
1986                 trace_rpcb_bind_version_err(task);
1987                 break;
1988         case -EPROTONOSUPPORT:
1989                 trace_rpcb_bind_version_err(task);
1990                 goto retry_timeout;
1991         case -ECONNREFUSED:             /* connection problems */
1992         case -ECONNRESET:
1993         case -ECONNABORTED:
1994         case -ENOTCONN:
1995         case -EHOSTDOWN:
1996         case -ENETDOWN:
1997         case -EHOSTUNREACH:
1998         case -ENETUNREACH:
1999         case -EPIPE:
2000                 trace_rpcb_unreachable_err(task);
2001                 if (!RPC_IS_SOFTCONN(task)) {
2002                         rpc_delay(task, 5*HZ);
2003                         goto retry_timeout;
2004                 }
2005                 status = task->tk_status;
2006                 break;
2007         default:
2008                 trace_rpcb_unrecognized_err(task);
2009         }
2010
2011         rpc_call_rpcerror(task, status);
2012         return;
2013 out_next:
2014         task->tk_action = call_connect;
2015         return;
2016 retry_timeout:
2017         task->tk_status = 0;
2018         task->tk_action = call_bind;
2019         rpc_check_timeout(task);
2020 }
2021
2022 /*
2023  * 4b.  Connect to the RPC server
2024  */
2025 static void
2026 call_connect(struct rpc_task *task)
2027 {
2028         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2029
2030         if (rpc_task_transmitted(task)) {
2031                 rpc_task_handle_transmitted(task);
2032                 return;
2033         }
2034
2035         if (xprt_connected(xprt)) {
2036                 task->tk_action = call_transmit;
2037                 return;
2038         }
2039
2040         task->tk_action = call_connect_status;
2041         if (task->tk_status < 0)
2042                 return;
2043         if (task->tk_flags & RPC_TASK_NOCONNECT) {
2044                 rpc_call_rpcerror(task, -ENOTCONN);
2045                 return;
2046         }
2047         if (!xprt_prepare_transmit(task))
2048                 return;
2049         xprt_connect(task);
2050 }
2051
2052 /*
2053  * 4c.  Sort out connect result
2054  */
2055 static void
2056 call_connect_status(struct rpc_task *task)
2057 {
2058         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2059         struct rpc_clnt *clnt = task->tk_client;
2060         int status = task->tk_status;
2061
2062         if (rpc_task_transmitted(task)) {
2063                 rpc_task_handle_transmitted(task);
2064                 return;
2065         }
2066
2067         trace_rpc_connect_status(task);
2068
2069         if (task->tk_status == 0) {
2070                 clnt->cl_stats->netreconn++;
2071                 goto out_next;
2072         }
2073         if (xprt_connected(xprt)) {
2074                 task->tk_status = 0;
2075                 goto out_next;
2076         }
2077
2078         task->tk_status = 0;
2079         switch (status) {
2080         case -ECONNREFUSED:
2081                 /* A positive refusal suggests a rebind is needed. */
2082                 if (RPC_IS_SOFTCONN(task))
2083                         break;
2084                 if (clnt->cl_autobind) {
2085                         rpc_force_rebind(clnt);
2086                         goto out_retry;
2087                 }
2088                 fallthrough;
2089         case -ECONNRESET:
2090         case -ECONNABORTED:
2091         case -ENETDOWN:
2092         case -ENETUNREACH:
2093         case -EHOSTUNREACH:
2094         case -EPIPE:
2095         case -EPROTO:
2096                 xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2097                                             task->tk_rqstp->rq_connect_cookie);
2098                 if (RPC_IS_SOFTCONN(task))
2099                         break;
2100                 /* retry with existing socket, after a delay */
2101                 rpc_delay(task, 3*HZ);
2102                 fallthrough;
2103         case -EADDRINUSE:
2104         case -ENOTCONN:
2105         case -EAGAIN:
2106         case -ETIMEDOUT:
2107                 if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2108                     (task->tk_flags & RPC_TASK_MOVEABLE) &&
2109                     test_bit(XPRT_REMOVE, &xprt->state)) {
2110                         struct rpc_xprt *saved = task->tk_xprt;
2111                         struct rpc_xprt_switch *xps;
2112
2113                         rcu_read_lock();
2114                         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2115                         rcu_read_unlock();
2116                         if (xps->xps_nxprts > 1) {
2117                                 long value;
2118
2119                                 xprt_release(task);
2120                                 value = atomic_long_dec_return(&xprt->queuelen);
2121                                 if (value == 0)
2122                                         rpc_xprt_switch_remove_xprt(xps, saved);
2123                                 xprt_put(saved);
2124                                 task->tk_xprt = NULL;
2125                                 task->tk_action = call_start;
2126                         }
2127                         xprt_switch_put(xps);
2128                         if (!task->tk_xprt)
2129                                 return;
2130                 }
2131                 goto out_retry;
2132         case -ENOBUFS:
2133                 rpc_delay(task, HZ >> 2);
2134                 goto out_retry;
2135         }
2136         rpc_call_rpcerror(task, status);
2137         return;
2138 out_next:
2139         task->tk_action = call_transmit;
2140         return;
2141 out_retry:
2142         /* Check for timeouts before looping back to call_bind */
2143         task->tk_action = call_bind;
2144         rpc_check_timeout(task);
2145 }
2146
2147 /*
2148  * 5.   Transmit the RPC request, and wait for reply
2149  */
2150 static void
2151 call_transmit(struct rpc_task *task)
2152 {
2153         if (rpc_task_transmitted(task)) {
2154                 rpc_task_handle_transmitted(task);
2155                 return;
2156         }
2157
2158         task->tk_action = call_transmit_status;
2159         if (!xprt_prepare_transmit(task))
2160                 return;
2161         task->tk_status = 0;
2162         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2163                 if (!xprt_connected(task->tk_xprt)) {
2164                         task->tk_status = -ENOTCONN;
2165                         return;
2166                 }
2167                 xprt_transmit(task);
2168         }
2169         xprt_end_transmit(task);
2170 }
2171
2172 /*
2173  * 5a.  Handle cleanup after a transmission
2174  */
2175 static void
2176 call_transmit_status(struct rpc_task *task)
2177 {
2178         task->tk_action = call_status;
2179
2180         /*
2181          * Common case: success.  Force the compiler to put this
2182          * test first.
2183          */
2184         if (rpc_task_transmitted(task)) {
2185                 task->tk_status = 0;
2186                 xprt_request_wait_receive(task);
2187                 return;
2188         }
2189
2190         switch (task->tk_status) {
2191         default:
2192                 break;
2193         case -EBADMSG:
2194                 task->tk_status = 0;
2195                 task->tk_action = call_encode;
2196                 break;
2197                 /*
2198                  * Special cases: if we've been waiting on the
2199                  * socket's write_space() callback, or if the
2200                  * socket just returned a connection error,
2201                  * then hold onto the transport lock.
2202                  */
2203         case -ENOBUFS:
2204                 rpc_delay(task, HZ>>2);
2205                 fallthrough;
2206         case -EBADSLT:
2207         case -EAGAIN:
2208                 task->tk_action = call_transmit;
2209                 task->tk_status = 0;
2210                 break;
2211         case -ECONNREFUSED:
2212         case -EHOSTDOWN:
2213         case -ENETDOWN:
2214         case -EHOSTUNREACH:
2215         case -ENETUNREACH:
2216         case -EPERM:
2217                 if (RPC_IS_SOFTCONN(task)) {
2218                         if (!task->tk_msg.rpc_proc->p_proc)
2219                                 trace_xprt_ping(task->tk_xprt,
2220                                                 task->tk_status);
2221                         rpc_call_rpcerror(task, task->tk_status);
2222                         return;
2223                 }
2224                 fallthrough;
2225         case -ECONNRESET:
2226         case -ECONNABORTED:
2227         case -EADDRINUSE:
2228         case -ENOTCONN:
2229         case -EPIPE:
2230                 task->tk_action = call_bind;
2231                 task->tk_status = 0;
2232                 break;
2233         }
2234         rpc_check_timeout(task);
2235 }
2236
2237 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2238 static void call_bc_transmit(struct rpc_task *task);
2239 static void call_bc_transmit_status(struct rpc_task *task);
2240
2241 static void
2242 call_bc_encode(struct rpc_task *task)
2243 {
2244         xprt_request_enqueue_transmit(task);
2245         task->tk_action = call_bc_transmit;
2246 }
2247
2248 /*
2249  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2250  * addition, disconnect on connectivity errors.
2251  */
2252 static void
2253 call_bc_transmit(struct rpc_task *task)
2254 {
2255         task->tk_action = call_bc_transmit_status;
2256         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2257                 if (!xprt_prepare_transmit(task))
2258                         return;
2259                 task->tk_status = 0;
2260                 xprt_transmit(task);
2261         }
2262         xprt_end_transmit(task);
2263 }
2264
2265 static void
2266 call_bc_transmit_status(struct rpc_task *task)
2267 {
2268         struct rpc_rqst *req = task->tk_rqstp;
2269
2270         if (rpc_task_transmitted(task))
2271                 task->tk_status = 0;
2272
2273         switch (task->tk_status) {
2274         case 0:
2275                 /* Success */
2276         case -ENETDOWN:
2277         case -EHOSTDOWN:
2278         case -EHOSTUNREACH:
2279         case -ENETUNREACH:
2280         case -ECONNRESET:
2281         case -ECONNREFUSED:
2282         case -EADDRINUSE:
2283         case -ENOTCONN:
2284         case -EPIPE:
2285                 break;
2286         case -ENOBUFS:
2287                 rpc_delay(task, HZ>>2);
2288                 fallthrough;
2289         case -EBADSLT:
2290         case -EAGAIN:
2291                 task->tk_status = 0;
2292                 task->tk_action = call_bc_transmit;
2293                 return;
2294         case -ETIMEDOUT:
2295                 /*
2296                  * Problem reaching the server.  Disconnect and let the
2297                  * forechannel reestablish the connection.  The server will
2298                  * have to retransmit the backchannel request and we'll
2299                  * reprocess it.  Since these ops are idempotent, there's no
2300                  * need to cache our reply at this time.
2301                  */
2302                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2303                         "error: %d\n", task->tk_status);
2304                 xprt_conditional_disconnect(req->rq_xprt,
2305                         req->rq_connect_cookie);
2306                 break;
2307         default:
2308                 /*
2309                  * We were unable to reply and will have to drop the
2310                  * request.  The server should reconnect and retransmit.
2311                  */
2312                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2313                         "error: %d\n", task->tk_status);
2314                 break;
2315         }
2316         task->tk_action = rpc_exit_task;
2317 }
2318 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2319
2320 /*
2321  * 6.   Sort out the RPC call status
2322  */
2323 static void
2324 call_status(struct rpc_task *task)
2325 {
2326         struct rpc_clnt *clnt = task->tk_client;
2327         int             status;
2328
2329         if (!task->tk_msg.rpc_proc->p_proc)
2330                 trace_xprt_ping(task->tk_xprt, task->tk_status);
2331
2332         status = task->tk_status;
2333         if (status >= 0) {
2334                 task->tk_action = call_decode;
2335                 return;
2336         }
2337
2338         trace_rpc_call_status(task);
2339         task->tk_status = 0;
2340         switch(status) {
2341         case -EHOSTDOWN:
2342         case -ENETDOWN:
2343         case -EHOSTUNREACH:
2344         case -ENETUNREACH:
2345         case -EPERM:
2346                 if (RPC_IS_SOFTCONN(task))
2347                         goto out_exit;
2348                 /*
2349                  * Delay any retries for 3 seconds, then handle as if it
2350                  * were a timeout.
2351                  */
2352                 rpc_delay(task, 3*HZ);
2353                 fallthrough;
2354         case -ETIMEDOUT:
2355                 break;
2356         case -ECONNREFUSED:
2357         case -ECONNRESET:
2358         case -ECONNABORTED:
2359         case -ENOTCONN:
2360                 rpc_force_rebind(clnt);
2361                 break;
2362         case -EADDRINUSE:
2363                 rpc_delay(task, 3*HZ);
2364                 fallthrough;
2365         case -EPIPE:
2366         case -EAGAIN:
2367                 break;
2368         case -EIO:
2369                 /* shutdown or soft timeout */
2370                 goto out_exit;
2371         default:
2372                 if (clnt->cl_chatty)
2373                         printk("%s: RPC call returned error %d\n",
2374                                clnt->cl_program->name, -status);
2375                 goto out_exit;
2376         }
2377         task->tk_action = call_encode;
2378         if (status != -ECONNRESET && status != -ECONNABORTED)
2379                 rpc_check_timeout(task);
2380         return;
2381 out_exit:
2382         rpc_call_rpcerror(task, status);
2383 }
2384
2385 static bool
2386 rpc_check_connected(const struct rpc_rqst *req)
2387 {
2388         /* No allocated request or transport? return true */
2389         if (!req || !req->rq_xprt)
2390                 return true;
2391         return xprt_connected(req->rq_xprt);
2392 }
2393
2394 static void
2395 rpc_check_timeout(struct rpc_task *task)
2396 {
2397         struct rpc_clnt *clnt = task->tk_client;
2398
2399         if (RPC_SIGNALLED(task)) {
2400                 rpc_call_rpcerror(task, -ERESTARTSYS);
2401                 return;
2402         }
2403
2404         if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2405                 return;
2406
2407         trace_rpc_timeout_status(task);
2408         task->tk_timeouts++;
2409
2410         if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2411                 rpc_call_rpcerror(task, -ETIMEDOUT);
2412                 return;
2413         }
2414
2415         if (RPC_IS_SOFT(task)) {
2416                 /*
2417                  * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2418                  * been sent, it should time out only if the transport
2419                  * connection gets terminally broken.
2420                  */
2421                 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2422                     rpc_check_connected(task->tk_rqstp))
2423                         return;
2424
2425                 if (clnt->cl_chatty) {
2426                         pr_notice_ratelimited(
2427                                 "%s: server %s not responding, timed out\n",
2428                                 clnt->cl_program->name,
2429                                 task->tk_xprt->servername);
2430                 }
2431                 if (task->tk_flags & RPC_TASK_TIMEOUT)
2432                         rpc_call_rpcerror(task, -ETIMEDOUT);
2433                 else
2434                         __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2435                 return;
2436         }
2437
2438         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2439                 task->tk_flags |= RPC_CALL_MAJORSEEN;
2440                 if (clnt->cl_chatty) {
2441                         pr_notice_ratelimited(
2442                                 "%s: server %s not responding, still trying\n",
2443                                 clnt->cl_program->name,
2444                                 task->tk_xprt->servername);
2445                 }
2446         }
2447         rpc_force_rebind(clnt);
2448         /*
2449          * Did our request time out due to an RPCSEC_GSS out-of-sequence
2450          * event? RFC2203 requires the server to drop all such requests.
2451          */
2452         rpcauth_invalcred(task);
2453 }
2454
2455 /*
2456  * 7.   Decode the RPC reply
2457  */
2458 static void
2459 call_decode(struct rpc_task *task)
2460 {
2461         struct rpc_clnt *clnt = task->tk_client;
2462         struct rpc_rqst *req = task->tk_rqstp;
2463         struct xdr_stream xdr;
2464         int err;
2465
2466         if (!task->tk_msg.rpc_proc->p_decode) {
2467                 task->tk_action = rpc_exit_task;
2468                 return;
2469         }
2470
2471         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2472                 if (clnt->cl_chatty) {
2473                         pr_notice_ratelimited("%s: server %s OK\n",
2474                                 clnt->cl_program->name,
2475                                 task->tk_xprt->servername);
2476                 }
2477                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2478         }
2479
2480         /*
2481          * Did we ever call xprt_complete_rqst()? If not, we should assume
2482          * the message is incomplete.
2483          */
2484         err = -EAGAIN;
2485         if (!req->rq_reply_bytes_recvd)
2486                 goto out;
2487
2488         /* Ensure that we see all writes made by xprt_complete_rqst()
2489          * before it changed req->rq_reply_bytes_recvd.
2490          */
2491         smp_rmb();
2492
2493         req->rq_rcv_buf.len = req->rq_private_buf.len;
2494         trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2495
2496         /* Check that the softirq receive buffer is valid */
2497         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2498                                 sizeof(req->rq_rcv_buf)) != 0);
2499
2500         xdr_init_decode(&xdr, &req->rq_rcv_buf,
2501                         req->rq_rcv_buf.head[0].iov_base, req);
2502         err = rpc_decode_header(task, &xdr);
2503 out:
2504         switch (err) {
2505         case 0:
2506                 task->tk_action = rpc_exit_task;
2507                 task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2508                 return;
2509         case -EAGAIN:
2510                 task->tk_status = 0;
2511                 if (task->tk_client->cl_discrtry)
2512                         xprt_conditional_disconnect(req->rq_xprt,
2513                                                     req->rq_connect_cookie);
2514                 task->tk_action = call_encode;
2515                 rpc_check_timeout(task);
2516                 break;
2517         case -EKEYREJECTED:
2518                 task->tk_action = call_reserve;
2519                 rpc_check_timeout(task);
2520                 rpcauth_invalcred(task);
2521                 /* Ensure we obtain a new XID if we retry! */
2522                 xprt_release(task);
2523         }
2524 }
2525
2526 static int
2527 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2528 {
2529         struct rpc_clnt *clnt = task->tk_client;
2530         struct rpc_rqst *req = task->tk_rqstp;
2531         __be32 *p;
2532         int error;
2533
2534         error = -EMSGSIZE;
2535         p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2536         if (!p)
2537                 goto out_fail;
2538         *p++ = req->rq_xid;
2539         *p++ = rpc_call;
2540         *p++ = cpu_to_be32(RPC_VERSION);
2541         *p++ = cpu_to_be32(clnt->cl_prog);
2542         *p++ = cpu_to_be32(clnt->cl_vers);
2543         *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2544
2545         error = rpcauth_marshcred(task, xdr);
2546         if (error < 0)
2547                 goto out_fail;
2548         return 0;
2549 out_fail:
2550         trace_rpc_bad_callhdr(task);
2551         rpc_call_rpcerror(task, error);
2552         return error;
2553 }
2554
2555 static noinline int
2556 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2557 {
2558         struct rpc_clnt *clnt = task->tk_client;
2559         int error;
2560         __be32 *p;
2561
2562         /* RFC-1014 says that the representation of XDR data must be a
2563          * multiple of four bytes
2564          * - if it isn't pointer subtraction in the NFS client may give
2565          *   undefined results
2566          */
2567         if (task->tk_rqstp->rq_rcv_buf.len & 3)
2568                 goto out_unparsable;
2569
2570         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2571         if (!p)
2572                 goto out_unparsable;
2573         p++;    /* skip XID */
2574         if (*p++ != rpc_reply)
2575                 goto out_unparsable;
2576         if (*p++ != rpc_msg_accepted)
2577                 goto out_msg_denied;
2578
2579         error = rpcauth_checkverf(task, xdr);
2580         if (error)
2581                 goto out_verifier;
2582
2583         p = xdr_inline_decode(xdr, sizeof(*p));
2584         if (!p)
2585                 goto out_unparsable;
2586         switch (*p) {
2587         case rpc_success:
2588                 return 0;
2589         case rpc_prog_unavail:
2590                 trace_rpc__prog_unavail(task);
2591                 error = -EPFNOSUPPORT;
2592                 goto out_err;
2593         case rpc_prog_mismatch:
2594                 trace_rpc__prog_mismatch(task);
2595                 error = -EPROTONOSUPPORT;
2596                 goto out_err;
2597         case rpc_proc_unavail:
2598                 trace_rpc__proc_unavail(task);
2599                 error = -EOPNOTSUPP;
2600                 goto out_err;
2601         case rpc_garbage_args:
2602         case rpc_system_err:
2603                 trace_rpc__garbage_args(task);
2604                 error = -EIO;
2605                 break;
2606         default:
2607                 goto out_unparsable;
2608         }
2609
2610 out_garbage:
2611         clnt->cl_stats->rpcgarbage++;
2612         if (task->tk_garb_retry) {
2613                 task->tk_garb_retry--;
2614                 task->tk_action = call_encode;
2615                 return -EAGAIN;
2616         }
2617 out_err:
2618         rpc_call_rpcerror(task, error);
2619         return error;
2620
2621 out_unparsable:
2622         trace_rpc__unparsable(task);
2623         error = -EIO;
2624         goto out_garbage;
2625
2626 out_verifier:
2627         trace_rpc_bad_verifier(task);
2628         goto out_garbage;
2629
2630 out_msg_denied:
2631         error = -EACCES;
2632         p = xdr_inline_decode(xdr, sizeof(*p));
2633         if (!p)
2634                 goto out_unparsable;
2635         switch (*p++) {
2636         case rpc_auth_error:
2637                 break;
2638         case rpc_mismatch:
2639                 trace_rpc__mismatch(task);
2640                 error = -EPROTONOSUPPORT;
2641                 goto out_err;
2642         default:
2643                 goto out_unparsable;
2644         }
2645
2646         p = xdr_inline_decode(xdr, sizeof(*p));
2647         if (!p)
2648                 goto out_unparsable;
2649         switch (*p++) {
2650         case rpc_autherr_rejectedcred:
2651         case rpc_autherr_rejectedverf:
2652         case rpcsec_gsserr_credproblem:
2653         case rpcsec_gsserr_ctxproblem:
2654                 if (!task->tk_cred_retry)
2655                         break;
2656                 task->tk_cred_retry--;
2657                 trace_rpc__stale_creds(task);
2658                 return -EKEYREJECTED;
2659         case rpc_autherr_badcred:
2660         case rpc_autherr_badverf:
2661                 /* possibly garbled cred/verf? */
2662                 if (!task->tk_garb_retry)
2663                         break;
2664                 task->tk_garb_retry--;
2665                 trace_rpc__bad_creds(task);
2666                 task->tk_action = call_encode;
2667                 return -EAGAIN;
2668         case rpc_autherr_tooweak:
2669                 trace_rpc__auth_tooweak(task);
2670                 pr_warn("RPC: server %s requires stronger authentication.\n",
2671                         task->tk_xprt->servername);
2672                 break;
2673         default:
2674                 goto out_unparsable;
2675         }
2676         goto out_err;
2677 }
2678
2679 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2680                 const void *obj)
2681 {
2682 }
2683
2684 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2685                 void *obj)
2686 {
2687         return 0;
2688 }
2689
2690 static const struct rpc_procinfo rpcproc_null = {
2691         .p_encode = rpcproc_encode_null,
2692         .p_decode = rpcproc_decode_null,
2693 };
2694
2695 static void
2696 rpc_null_call_prepare(struct rpc_task *task, void *data)
2697 {
2698         task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2699         rpc_call_start(task);
2700 }
2701
2702 static const struct rpc_call_ops rpc_null_ops = {
2703         .rpc_call_prepare = rpc_null_call_prepare,
2704         .rpc_call_done = rpc_default_callback,
2705 };
2706
2707 static
2708 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2709                 struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2710                 const struct rpc_call_ops *ops, void *data)
2711 {
2712         struct rpc_message msg = {
2713                 .rpc_proc = &rpcproc_null,
2714         };
2715         struct rpc_task_setup task_setup_data = {
2716                 .rpc_client = clnt,
2717                 .rpc_xprt = xprt,
2718                 .rpc_message = &msg,
2719                 .rpc_op_cred = cred,
2720                 .callback_ops = ops ?: &rpc_null_ops,
2721                 .callback_data = data,
2722                 .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2723                          RPC_TASK_NULLCREDS,
2724         };
2725
2726         return rpc_run_task(&task_setup_data);
2727 }
2728
2729 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2730 {
2731         return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2732 }
2733 EXPORT_SYMBOL_GPL(rpc_call_null);
2734
2735 static int rpc_ping(struct rpc_clnt *clnt)
2736 {
2737         struct rpc_task *task;
2738         int status;
2739
2740         task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2741         if (IS_ERR(task))
2742                 return PTR_ERR(task);
2743         status = task->tk_status;
2744         rpc_put_task(task);
2745         return status;
2746 }
2747
2748 struct rpc_cb_add_xprt_calldata {
2749         struct rpc_xprt_switch *xps;
2750         struct rpc_xprt *xprt;
2751 };
2752
2753 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2754 {
2755         struct rpc_cb_add_xprt_calldata *data = calldata;
2756
2757         if (task->tk_status == 0)
2758                 rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2759 }
2760
2761 static void rpc_cb_add_xprt_release(void *calldata)
2762 {
2763         struct rpc_cb_add_xprt_calldata *data = calldata;
2764
2765         xprt_put(data->xprt);
2766         xprt_switch_put(data->xps);
2767         kfree(data);
2768 }
2769
2770 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2771         .rpc_call_prepare = rpc_null_call_prepare,
2772         .rpc_call_done = rpc_cb_add_xprt_done,
2773         .rpc_release = rpc_cb_add_xprt_release,
2774 };
2775
2776 /**
2777  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2778  * @clnt: pointer to struct rpc_clnt
2779  * @xps: pointer to struct rpc_xprt_switch,
2780  * @xprt: pointer struct rpc_xprt
2781  * @dummy: unused
2782  */
2783 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2784                 struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2785                 void *dummy)
2786 {
2787         struct rpc_cb_add_xprt_calldata *data;
2788         struct rpc_task *task;
2789
2790         if (xps->xps_nunique_destaddr_xprts + 1 > clnt->cl_max_connect) {
2791                 rcu_read_lock();
2792                 pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2793                         "transport to server: %s\n", clnt->cl_max_connect,
2794                         rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2795                 rcu_read_unlock();
2796                 return -EINVAL;
2797         }
2798
2799         data = kmalloc(sizeof(*data), GFP_KERNEL);
2800         if (!data)
2801                 return -ENOMEM;
2802         data->xps = xprt_switch_get(xps);
2803         data->xprt = xprt_get(xprt);
2804         if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2805                 rpc_cb_add_xprt_release(data);
2806                 goto success;
2807         }
2808
2809         task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2810                         &rpc_cb_add_xprt_call_ops, data);
2811         data->xps->xps_nunique_destaddr_xprts++;
2812         rpc_put_task(task);
2813 success:
2814         return 1;
2815 }
2816 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2817
2818 /**
2819  * rpc_clnt_setup_test_and_add_xprt()
2820  *
2821  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2822  *   1) caller of the test function must dereference the rpc_xprt_switch
2823  *   and the rpc_xprt.
2824  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2825  *   the rpc_call_done routine.
2826  *
2827  * Upon success (return of 1), the test function adds the new
2828  * transport to the rpc_clnt xprt switch
2829  *
2830  * @clnt: struct rpc_clnt to get the new transport
2831  * @xps:  the rpc_xprt_switch to hold the new transport
2832  * @xprt: the rpc_xprt to test
2833  * @data: a struct rpc_add_xprt_test pointer that holds the test function
2834  *        and test function call data
2835  */
2836 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2837                                      struct rpc_xprt_switch *xps,
2838                                      struct rpc_xprt *xprt,
2839                                      void *data)
2840 {
2841         struct rpc_task *task;
2842         struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2843         int status = -EADDRINUSE;
2844
2845         xprt = xprt_get(xprt);
2846         xprt_switch_get(xps);
2847
2848         if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2849                 goto out_err;
2850
2851         /* Test the connection */
2852         task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2853         if (IS_ERR(task)) {
2854                 status = PTR_ERR(task);
2855                 goto out_err;
2856         }
2857         status = task->tk_status;
2858         rpc_put_task(task);
2859
2860         if (status < 0)
2861                 goto out_err;
2862
2863         /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2864         xtest->add_xprt_test(clnt, xprt, xtest->data);
2865
2866         xprt_put(xprt);
2867         xprt_switch_put(xps);
2868
2869         /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2870         return 1;
2871 out_err:
2872         xprt_put(xprt);
2873         xprt_switch_put(xps);
2874         pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2875                 status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2876         return status;
2877 }
2878 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2879
2880 /**
2881  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2882  * @clnt: pointer to struct rpc_clnt
2883  * @xprtargs: pointer to struct xprt_create
2884  * @setup: callback to test and/or set up the connection
2885  * @data: pointer to setup function data
2886  *
2887  * Creates a new transport using the parameters set in args and
2888  * adds it to clnt.
2889  * If ping is set, then test that connectivity succeeds before
2890  * adding the new transport.
2891  *
2892  */
2893 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2894                 struct xprt_create *xprtargs,
2895                 int (*setup)(struct rpc_clnt *,
2896                         struct rpc_xprt_switch *,
2897                         struct rpc_xprt *,
2898                         void *),
2899                 void *data)
2900 {
2901         struct rpc_xprt_switch *xps;
2902         struct rpc_xprt *xprt;
2903         unsigned long connect_timeout;
2904         unsigned long reconnect_timeout;
2905         unsigned char resvport, reuseport;
2906         int ret = 0, ident;
2907
2908         rcu_read_lock();
2909         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2910         xprt = xprt_iter_xprt(&clnt->cl_xpi);
2911         if (xps == NULL || xprt == NULL) {
2912                 rcu_read_unlock();
2913                 xprt_switch_put(xps);
2914                 return -EAGAIN;
2915         }
2916         resvport = xprt->resvport;
2917         reuseport = xprt->reuseport;
2918         connect_timeout = xprt->connect_timeout;
2919         reconnect_timeout = xprt->max_reconnect_timeout;
2920         ident = xprt->xprt_class->ident;
2921         rcu_read_unlock();
2922
2923         if (!xprtargs->ident)
2924                 xprtargs->ident = ident;
2925         xprt = xprt_create_transport(xprtargs);
2926         if (IS_ERR(xprt)) {
2927                 ret = PTR_ERR(xprt);
2928                 goto out_put_switch;
2929         }
2930         xprt->resvport = resvport;
2931         xprt->reuseport = reuseport;
2932         if (xprt->ops->set_connect_timeout != NULL)
2933                 xprt->ops->set_connect_timeout(xprt,
2934                                 connect_timeout,
2935                                 reconnect_timeout);
2936
2937         rpc_xprt_switch_set_roundrobin(xps);
2938         if (setup) {
2939                 ret = setup(clnt, xps, xprt, data);
2940                 if (ret != 0)
2941                         goto out_put_xprt;
2942         }
2943         rpc_xprt_switch_add_xprt(xps, xprt);
2944 out_put_xprt:
2945         xprt_put(xprt);
2946 out_put_switch:
2947         xprt_switch_put(xps);
2948         return ret;
2949 }
2950 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2951
2952 struct connect_timeout_data {
2953         unsigned long connect_timeout;
2954         unsigned long reconnect_timeout;
2955 };
2956
2957 static int
2958 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2959                 struct rpc_xprt *xprt,
2960                 void *data)
2961 {
2962         struct connect_timeout_data *timeo = data;
2963
2964         if (xprt->ops->set_connect_timeout)
2965                 xprt->ops->set_connect_timeout(xprt,
2966                                 timeo->connect_timeout,
2967                                 timeo->reconnect_timeout);
2968         return 0;
2969 }
2970
2971 void
2972 rpc_set_connect_timeout(struct rpc_clnt *clnt,
2973                 unsigned long connect_timeout,
2974                 unsigned long reconnect_timeout)
2975 {
2976         struct connect_timeout_data timeout = {
2977                 .connect_timeout = connect_timeout,
2978                 .reconnect_timeout = reconnect_timeout,
2979         };
2980         rpc_clnt_iterate_for_each_xprt(clnt,
2981                         rpc_xprt_set_connect_timeout,
2982                         &timeout);
2983 }
2984 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2985
2986 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
2987 {
2988         rcu_read_lock();
2989         xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2990         rcu_read_unlock();
2991 }
2992 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
2993
2994 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
2995 {
2996         rcu_read_lock();
2997         rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
2998                                  xprt);
2999         rcu_read_unlock();
3000 }
3001 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3002
3003 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3004                                    const struct sockaddr *sap)
3005 {
3006         struct rpc_xprt_switch *xps;
3007         bool ret;
3008
3009         rcu_read_lock();
3010         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3011         ret = rpc_xprt_switch_has_addr(xps, sap);
3012         rcu_read_unlock();
3013         return ret;
3014 }
3015 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3016
3017 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
3018 static void rpc_show_header(void)
3019 {
3020         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3021                 "-timeout ---ops--\n");
3022 }
3023
3024 static void rpc_show_task(const struct rpc_clnt *clnt,
3025                           const struct rpc_task *task)
3026 {
3027         const char *rpc_waitq = "none";
3028
3029         if (RPC_IS_QUEUED(task))
3030                 rpc_waitq = rpc_qname(task->tk_waitqueue);
3031
3032         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3033                 task->tk_pid, task->tk_flags, task->tk_status,
3034                 clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3035                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3036                 task->tk_action, rpc_waitq);
3037 }
3038
3039 void rpc_show_tasks(struct net *net)
3040 {
3041         struct rpc_clnt *clnt;
3042         struct rpc_task *task;
3043         int header = 0;
3044         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3045
3046         spin_lock(&sn->rpc_client_lock);
3047         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3048                 spin_lock(&clnt->cl_lock);
3049                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3050                         if (!header) {
3051                                 rpc_show_header();
3052                                 header++;
3053                         }
3054                         rpc_show_task(clnt, task);
3055                 }
3056                 spin_unlock(&clnt->cl_lock);
3057         }
3058         spin_unlock(&sn->rpc_client_lock);
3059 }
3060 #endif
3061
3062 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3063 static int
3064 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3065                 struct rpc_xprt *xprt,
3066                 void *dummy)
3067 {
3068         return xprt_enable_swap(xprt);
3069 }
3070
3071 int
3072 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3073 {
3074         while (clnt != clnt->cl_parent)
3075                 clnt = clnt->cl_parent;
3076         if (atomic_inc_return(&clnt->cl_swapper) == 1)
3077                 return rpc_clnt_iterate_for_each_xprt(clnt,
3078                                 rpc_clnt_swap_activate_callback, NULL);
3079         return 0;
3080 }
3081 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3082
3083 static int
3084 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3085                 struct rpc_xprt *xprt,
3086                 void *dummy)
3087 {
3088         xprt_disable_swap(xprt);
3089         return 0;
3090 }
3091
3092 void
3093 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3094 {
3095         if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3096                 rpc_clnt_iterate_for_each_xprt(clnt,
3097                                 rpc_clnt_swap_deactivate_callback, NULL);
3098 }
3099 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3100 #endif /* CONFIG_SUNRPC_SWAP */