Merge tag 'drm-msm-fixes-2022-04-13' of https://gitlab.freedesktop.org/drm/msm into...
[linux-2.6-microblaze.git] / net / sunrpc / clnt.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  *  linux/net/sunrpc/clnt.c
4  *
5  *  This file contains the high-level RPC interface.
6  *  It is modeled as a finite state machine to support both synchronous
7  *  and asynchronous requests.
8  *
9  *  -   RPC header generation and argument serialization.
10  *  -   Credential refresh.
11  *  -   TCP connect handling.
12  *  -   Retry of operation when it is suspected the operation failed because
13  *      of uid squashing on the server, or when the credentials were stale
14  *      and need to be refreshed, or when a packet was damaged in transit.
15  *      This may be have to be moved to the VFS layer.
16  *
17  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19  */
20
21
22 #include <linux/module.h>
23 #include <linux/types.h>
24 #include <linux/kallsyms.h>
25 #include <linux/mm.h>
26 #include <linux/namei.h>
27 #include <linux/mount.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32 #include <linux/in.h>
33 #include <linux/in6.h>
34 #include <linux/un.h>
35
36 #include <linux/sunrpc/clnt.h>
37 #include <linux/sunrpc/addr.h>
38 #include <linux/sunrpc/rpc_pipe_fs.h>
39 #include <linux/sunrpc/metrics.h>
40 #include <linux/sunrpc/bc_xprt.h>
41 #include <trace/events/sunrpc.h>
42
43 #include "sunrpc.h"
44 #include "sysfs.h"
45 #include "netns.h"
46
47 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48 # define RPCDBG_FACILITY        RPCDBG_CALL
49 #endif
50
51 /*
52  * All RPC clients are linked into this list
53  */
54
55 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
56
57
58 static void     call_start(struct rpc_task *task);
59 static void     call_reserve(struct rpc_task *task);
60 static void     call_reserveresult(struct rpc_task *task);
61 static void     call_allocate(struct rpc_task *task);
62 static void     call_encode(struct rpc_task *task);
63 static void     call_decode(struct rpc_task *task);
64 static void     call_bind(struct rpc_task *task);
65 static void     call_bind_status(struct rpc_task *task);
66 static void     call_transmit(struct rpc_task *task);
67 static void     call_status(struct rpc_task *task);
68 static void     call_transmit_status(struct rpc_task *task);
69 static void     call_refresh(struct rpc_task *task);
70 static void     call_refreshresult(struct rpc_task *task);
71 static void     call_connect(struct rpc_task *task);
72 static void     call_connect_status(struct rpc_task *task);
73
74 static int      rpc_encode_header(struct rpc_task *task,
75                                   struct xdr_stream *xdr);
76 static int      rpc_decode_header(struct rpc_task *task,
77                                   struct xdr_stream *xdr);
78 static int      rpc_ping(struct rpc_clnt *clnt);
79 static void     rpc_check_timeout(struct rpc_task *task);
80
81 static void rpc_register_client(struct rpc_clnt *clnt)
82 {
83         struct net *net = rpc_net_ns(clnt);
84         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
85
86         spin_lock(&sn->rpc_client_lock);
87         list_add(&clnt->cl_clients, &sn->all_clients);
88         spin_unlock(&sn->rpc_client_lock);
89 }
90
91 static void rpc_unregister_client(struct rpc_clnt *clnt)
92 {
93         struct net *net = rpc_net_ns(clnt);
94         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
95
96         spin_lock(&sn->rpc_client_lock);
97         list_del(&clnt->cl_clients);
98         spin_unlock(&sn->rpc_client_lock);
99 }
100
101 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
102 {
103         rpc_remove_client_dir(clnt);
104 }
105
106 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
107 {
108         struct net *net = rpc_net_ns(clnt);
109         struct super_block *pipefs_sb;
110
111         pipefs_sb = rpc_get_sb_net(net);
112         if (pipefs_sb) {
113                 __rpc_clnt_remove_pipedir(clnt);
114                 rpc_put_sb_net(net);
115         }
116 }
117
118 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
119                                     struct rpc_clnt *clnt)
120 {
121         static uint32_t clntid;
122         const char *dir_name = clnt->cl_program->pipe_dir_name;
123         char name[15];
124         struct dentry *dir, *dentry;
125
126         dir = rpc_d_lookup_sb(sb, dir_name);
127         if (dir == NULL) {
128                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
129                 return dir;
130         }
131         for (;;) {
132                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
133                 name[sizeof(name) - 1] = '\0';
134                 dentry = rpc_create_client_dir(dir, name, clnt);
135                 if (!IS_ERR(dentry))
136                         break;
137                 if (dentry == ERR_PTR(-EEXIST))
138                         continue;
139                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
140                                 " %s/%s, error %ld\n",
141                                 dir_name, name, PTR_ERR(dentry));
142                 break;
143         }
144         dput(dir);
145         return dentry;
146 }
147
148 static int
149 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
150 {
151         struct dentry *dentry;
152
153         if (clnt->cl_program->pipe_dir_name != NULL) {
154                 dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
155                 if (IS_ERR(dentry))
156                         return PTR_ERR(dentry);
157         }
158         return 0;
159 }
160
161 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
162 {
163         if (clnt->cl_program->pipe_dir_name == NULL)
164                 return 1;
165
166         switch (event) {
167         case RPC_PIPEFS_MOUNT:
168                 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
169                         return 1;
170                 if (refcount_read(&clnt->cl_count) == 0)
171                         return 1;
172                 break;
173         case RPC_PIPEFS_UMOUNT:
174                 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
175                         return 1;
176                 break;
177         }
178         return 0;
179 }
180
181 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
182                                    struct super_block *sb)
183 {
184         struct dentry *dentry;
185
186         switch (event) {
187         case RPC_PIPEFS_MOUNT:
188                 dentry = rpc_setup_pipedir_sb(sb, clnt);
189                 if (!dentry)
190                         return -ENOENT;
191                 if (IS_ERR(dentry))
192                         return PTR_ERR(dentry);
193                 break;
194         case RPC_PIPEFS_UMOUNT:
195                 __rpc_clnt_remove_pipedir(clnt);
196                 break;
197         default:
198                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
199                 return -ENOTSUPP;
200         }
201         return 0;
202 }
203
204 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
205                                 struct super_block *sb)
206 {
207         int error = 0;
208
209         for (;; clnt = clnt->cl_parent) {
210                 if (!rpc_clnt_skip_event(clnt, event))
211                         error = __rpc_clnt_handle_event(clnt, event, sb);
212                 if (error || clnt == clnt->cl_parent)
213                         break;
214         }
215         return error;
216 }
217
218 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
219 {
220         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
221         struct rpc_clnt *clnt;
222
223         spin_lock(&sn->rpc_client_lock);
224         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
225                 if (rpc_clnt_skip_event(clnt, event))
226                         continue;
227                 spin_unlock(&sn->rpc_client_lock);
228                 return clnt;
229         }
230         spin_unlock(&sn->rpc_client_lock);
231         return NULL;
232 }
233
234 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
235                             void *ptr)
236 {
237         struct super_block *sb = ptr;
238         struct rpc_clnt *clnt;
239         int error = 0;
240
241         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
242                 error = __rpc_pipefs_event(clnt, event, sb);
243                 if (error)
244                         break;
245         }
246         return error;
247 }
248
249 static struct notifier_block rpc_clients_block = {
250         .notifier_call  = rpc_pipefs_event,
251         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
252 };
253
254 int rpc_clients_notifier_register(void)
255 {
256         return rpc_pipefs_notifier_register(&rpc_clients_block);
257 }
258
259 void rpc_clients_notifier_unregister(void)
260 {
261         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
262 }
263
264 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
265                 struct rpc_xprt *xprt,
266                 const struct rpc_timeout *timeout)
267 {
268         struct rpc_xprt *old;
269
270         spin_lock(&clnt->cl_lock);
271         old = rcu_dereference_protected(clnt->cl_xprt,
272                         lockdep_is_held(&clnt->cl_lock));
273
274         if (!xprt_bound(xprt))
275                 clnt->cl_autobind = 1;
276
277         clnt->cl_timeout = timeout;
278         rcu_assign_pointer(clnt->cl_xprt, xprt);
279         spin_unlock(&clnt->cl_lock);
280
281         return old;
282 }
283
284 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
285 {
286         clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
287                         nodename, sizeof(clnt->cl_nodename));
288 }
289
290 static int rpc_client_register(struct rpc_clnt *clnt,
291                                rpc_authflavor_t pseudoflavor,
292                                const char *client_name)
293 {
294         struct rpc_auth_create_args auth_args = {
295                 .pseudoflavor = pseudoflavor,
296                 .target_name = client_name,
297         };
298         struct rpc_auth *auth;
299         struct net *net = rpc_net_ns(clnt);
300         struct super_block *pipefs_sb;
301         int err;
302
303         rpc_clnt_debugfs_register(clnt);
304
305         pipefs_sb = rpc_get_sb_net(net);
306         if (pipefs_sb) {
307                 err = rpc_setup_pipedir(pipefs_sb, clnt);
308                 if (err)
309                         goto out;
310         }
311
312         rpc_register_client(clnt);
313         if (pipefs_sb)
314                 rpc_put_sb_net(net);
315
316         auth = rpcauth_create(&auth_args, clnt);
317         if (IS_ERR(auth)) {
318                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
319                                 pseudoflavor);
320                 err = PTR_ERR(auth);
321                 goto err_auth;
322         }
323         return 0;
324 err_auth:
325         pipefs_sb = rpc_get_sb_net(net);
326         rpc_unregister_client(clnt);
327         __rpc_clnt_remove_pipedir(clnt);
328 out:
329         if (pipefs_sb)
330                 rpc_put_sb_net(net);
331         rpc_sysfs_client_destroy(clnt);
332         rpc_clnt_debugfs_unregister(clnt);
333         return err;
334 }
335
336 static DEFINE_IDA(rpc_clids);
337
338 void rpc_cleanup_clids(void)
339 {
340         ida_destroy(&rpc_clids);
341 }
342
343 static int rpc_alloc_clid(struct rpc_clnt *clnt)
344 {
345         int clid;
346
347         clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
348         if (clid < 0)
349                 return clid;
350         clnt->cl_clid = clid;
351         return 0;
352 }
353
354 static void rpc_free_clid(struct rpc_clnt *clnt)
355 {
356         ida_simple_remove(&rpc_clids, clnt->cl_clid);
357 }
358
359 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
360                 struct rpc_xprt_switch *xps,
361                 struct rpc_xprt *xprt,
362                 struct rpc_clnt *parent)
363 {
364         const struct rpc_program *program = args->program;
365         const struct rpc_version *version;
366         struct rpc_clnt *clnt = NULL;
367         const struct rpc_timeout *timeout;
368         const char *nodename = args->nodename;
369         int err;
370
371         err = rpciod_up();
372         if (err)
373                 goto out_no_rpciod;
374
375         err = -EINVAL;
376         if (args->version >= program->nrvers)
377                 goto out_err;
378         version = program->version[args->version];
379         if (version == NULL)
380                 goto out_err;
381
382         err = -ENOMEM;
383         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
384         if (!clnt)
385                 goto out_err;
386         clnt->cl_parent = parent ? : clnt;
387
388         err = rpc_alloc_clid(clnt);
389         if (err)
390                 goto out_no_clid;
391
392         clnt->cl_cred     = get_cred(args->cred);
393         clnt->cl_procinfo = version->procs;
394         clnt->cl_maxproc  = version->nrprocs;
395         clnt->cl_prog     = args->prognumber ? : program->number;
396         clnt->cl_vers     = version->number;
397         clnt->cl_stats    = program->stats;
398         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
399         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
400         err = -ENOMEM;
401         if (clnt->cl_metrics == NULL)
402                 goto out_no_stats;
403         clnt->cl_program  = program;
404         INIT_LIST_HEAD(&clnt->cl_tasks);
405         spin_lock_init(&clnt->cl_lock);
406
407         timeout = xprt->timeout;
408         if (args->timeout != NULL) {
409                 memcpy(&clnt->cl_timeout_default, args->timeout,
410                                 sizeof(clnt->cl_timeout_default));
411                 timeout = &clnt->cl_timeout_default;
412         }
413
414         rpc_clnt_set_transport(clnt, xprt, timeout);
415         xprt->main = true;
416         xprt_iter_init(&clnt->cl_xpi, xps);
417         xprt_switch_put(xps);
418
419         clnt->cl_rtt = &clnt->cl_rtt_default;
420         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
421
422         refcount_set(&clnt->cl_count, 1);
423
424         if (nodename == NULL)
425                 nodename = utsname()->nodename;
426         /* save the nodename */
427         rpc_clnt_set_nodename(clnt, nodename);
428
429         rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
430         err = rpc_client_register(clnt, args->authflavor, args->client_name);
431         if (err)
432                 goto out_no_path;
433         if (parent)
434                 refcount_inc(&parent->cl_count);
435
436         trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
437         return clnt;
438
439 out_no_path:
440         rpc_free_iostats(clnt->cl_metrics);
441 out_no_stats:
442         put_cred(clnt->cl_cred);
443         rpc_free_clid(clnt);
444 out_no_clid:
445         kfree(clnt);
446 out_err:
447         rpciod_down();
448 out_no_rpciod:
449         xprt_switch_put(xps);
450         xprt_put(xprt);
451         trace_rpc_clnt_new_err(program->name, args->servername, err);
452         return ERR_PTR(err);
453 }
454
455 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
456                                         struct rpc_xprt *xprt)
457 {
458         struct rpc_clnt *clnt = NULL;
459         struct rpc_xprt_switch *xps;
460
461         if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
462                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
463                 xps = args->bc_xprt->xpt_bc_xps;
464                 xprt_switch_get(xps);
465         } else {
466                 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
467                 if (xps == NULL) {
468                         xprt_put(xprt);
469                         return ERR_PTR(-ENOMEM);
470                 }
471                 if (xprt->bc_xprt) {
472                         xprt_switch_get(xps);
473                         xprt->bc_xprt->xpt_bc_xps = xps;
474                 }
475         }
476         clnt = rpc_new_client(args, xps, xprt, NULL);
477         if (IS_ERR(clnt))
478                 return clnt;
479
480         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
481                 int err = rpc_ping(clnt);
482                 if (err != 0) {
483                         rpc_shutdown_client(clnt);
484                         return ERR_PTR(err);
485                 }
486         }
487
488         clnt->cl_softrtry = 1;
489         if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
490                 clnt->cl_softrtry = 0;
491                 if (args->flags & RPC_CLNT_CREATE_SOFTERR)
492                         clnt->cl_softerr = 1;
493         }
494
495         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
496                 clnt->cl_autobind = 1;
497         if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
498                 clnt->cl_noretranstimeo = 1;
499         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
500                 clnt->cl_discrtry = 1;
501         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
502                 clnt->cl_chatty = 1;
503
504         return clnt;
505 }
506
507 /**
508  * rpc_create - create an RPC client and transport with one call
509  * @args: rpc_clnt create argument structure
510  *
511  * Creates and initializes an RPC transport and an RPC client.
512  *
513  * It can ping the server in order to determine if it is up, and to see if
514  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
515  * this behavior so asynchronous tasks can also use rpc_create.
516  */
517 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
518 {
519         struct rpc_xprt *xprt;
520         struct xprt_create xprtargs = {
521                 .net = args->net,
522                 .ident = args->protocol,
523                 .srcaddr = args->saddress,
524                 .dstaddr = args->address,
525                 .addrlen = args->addrsize,
526                 .servername = args->servername,
527                 .bc_xprt = args->bc_xprt,
528         };
529         char servername[48];
530         struct rpc_clnt *clnt;
531         int i;
532
533         if (args->bc_xprt) {
534                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
535                 xprt = args->bc_xprt->xpt_bc_xprt;
536                 if (xprt) {
537                         xprt_get(xprt);
538                         return rpc_create_xprt(args, xprt);
539                 }
540         }
541
542         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
543                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
544         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
545                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
546         /*
547          * If the caller chooses not to specify a hostname, whip
548          * up a string representation of the passed-in address.
549          */
550         if (xprtargs.servername == NULL) {
551                 struct sockaddr_un *sun =
552                                 (struct sockaddr_un *)args->address;
553                 struct sockaddr_in *sin =
554                                 (struct sockaddr_in *)args->address;
555                 struct sockaddr_in6 *sin6 =
556                                 (struct sockaddr_in6 *)args->address;
557
558                 servername[0] = '\0';
559                 switch (args->address->sa_family) {
560                 case AF_LOCAL:
561                         snprintf(servername, sizeof(servername), "%s",
562                                  sun->sun_path);
563                         break;
564                 case AF_INET:
565                         snprintf(servername, sizeof(servername), "%pI4",
566                                  &sin->sin_addr.s_addr);
567                         break;
568                 case AF_INET6:
569                         snprintf(servername, sizeof(servername), "%pI6",
570                                  &sin6->sin6_addr);
571                         break;
572                 default:
573                         /* caller wants default server name, but
574                          * address family isn't recognized. */
575                         return ERR_PTR(-EINVAL);
576                 }
577                 xprtargs.servername = servername;
578         }
579
580         xprt = xprt_create_transport(&xprtargs);
581         if (IS_ERR(xprt))
582                 return (struct rpc_clnt *)xprt;
583
584         /*
585          * By default, kernel RPC client connects from a reserved port.
586          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
587          * but it is always enabled for rpciod, which handles the connect
588          * operation.
589          */
590         xprt->resvport = 1;
591         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
592                 xprt->resvport = 0;
593         xprt->reuseport = 0;
594         if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
595                 xprt->reuseport = 1;
596
597         clnt = rpc_create_xprt(args, xprt);
598         if (IS_ERR(clnt) || args->nconnect <= 1)
599                 return clnt;
600
601         for (i = 0; i < args->nconnect - 1; i++) {
602                 if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
603                         break;
604         }
605         return clnt;
606 }
607 EXPORT_SYMBOL_GPL(rpc_create);
608
609 /*
610  * This function clones the RPC client structure. It allows us to share the
611  * same transport while varying parameters such as the authentication
612  * flavour.
613  */
614 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
615                                            struct rpc_clnt *clnt)
616 {
617         struct rpc_xprt_switch *xps;
618         struct rpc_xprt *xprt;
619         struct rpc_clnt *new;
620         int err;
621
622         err = -ENOMEM;
623         rcu_read_lock();
624         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
625         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
626         rcu_read_unlock();
627         if (xprt == NULL || xps == NULL) {
628                 xprt_put(xprt);
629                 xprt_switch_put(xps);
630                 goto out_err;
631         }
632         args->servername = xprt->servername;
633         args->nodename = clnt->cl_nodename;
634
635         new = rpc_new_client(args, xps, xprt, clnt);
636         if (IS_ERR(new))
637                 return new;
638
639         /* Turn off autobind on clones */
640         new->cl_autobind = 0;
641         new->cl_softrtry = clnt->cl_softrtry;
642         new->cl_softerr = clnt->cl_softerr;
643         new->cl_noretranstimeo = clnt->cl_noretranstimeo;
644         new->cl_discrtry = clnt->cl_discrtry;
645         new->cl_chatty = clnt->cl_chatty;
646         new->cl_principal = clnt->cl_principal;
647         return new;
648
649 out_err:
650         trace_rpc_clnt_clone_err(clnt, err);
651         return ERR_PTR(err);
652 }
653
654 /**
655  * rpc_clone_client - Clone an RPC client structure
656  *
657  * @clnt: RPC client whose parameters are copied
658  *
659  * Returns a fresh RPC client or an ERR_PTR.
660  */
661 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
662 {
663         struct rpc_create_args args = {
664                 .program        = clnt->cl_program,
665                 .prognumber     = clnt->cl_prog,
666                 .version        = clnt->cl_vers,
667                 .authflavor     = clnt->cl_auth->au_flavor,
668                 .cred           = clnt->cl_cred,
669         };
670         return __rpc_clone_client(&args, clnt);
671 }
672 EXPORT_SYMBOL_GPL(rpc_clone_client);
673
674 /**
675  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
676  *
677  * @clnt: RPC client whose parameters are copied
678  * @flavor: security flavor for new client
679  *
680  * Returns a fresh RPC client or an ERR_PTR.
681  */
682 struct rpc_clnt *
683 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
684 {
685         struct rpc_create_args args = {
686                 .program        = clnt->cl_program,
687                 .prognumber     = clnt->cl_prog,
688                 .version        = clnt->cl_vers,
689                 .authflavor     = flavor,
690                 .cred           = clnt->cl_cred,
691         };
692         return __rpc_clone_client(&args, clnt);
693 }
694 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
695
696 /**
697  * rpc_switch_client_transport: switch the RPC transport on the fly
698  * @clnt: pointer to a struct rpc_clnt
699  * @args: pointer to the new transport arguments
700  * @timeout: pointer to the new timeout parameters
701  *
702  * This function allows the caller to switch the RPC transport for the
703  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
704  * server, for instance.  It assumes that the caller has ensured that
705  * there are no active RPC tasks by using some form of locking.
706  *
707  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
708  * negative errno is returned, and "clnt" continues to use the old
709  * xprt.
710  */
711 int rpc_switch_client_transport(struct rpc_clnt *clnt,
712                 struct xprt_create *args,
713                 const struct rpc_timeout *timeout)
714 {
715         const struct rpc_timeout *old_timeo;
716         rpc_authflavor_t pseudoflavor;
717         struct rpc_xprt_switch *xps, *oldxps;
718         struct rpc_xprt *xprt, *old;
719         struct rpc_clnt *parent;
720         int err;
721
722         xprt = xprt_create_transport(args);
723         if (IS_ERR(xprt))
724                 return PTR_ERR(xprt);
725
726         xps = xprt_switch_alloc(xprt, GFP_KERNEL);
727         if (xps == NULL) {
728                 xprt_put(xprt);
729                 return -ENOMEM;
730         }
731
732         pseudoflavor = clnt->cl_auth->au_flavor;
733
734         old_timeo = clnt->cl_timeout;
735         old = rpc_clnt_set_transport(clnt, xprt, timeout);
736         oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
737
738         rpc_unregister_client(clnt);
739         __rpc_clnt_remove_pipedir(clnt);
740         rpc_sysfs_client_destroy(clnt);
741         rpc_clnt_debugfs_unregister(clnt);
742
743         /*
744          * A new transport was created.  "clnt" therefore
745          * becomes the root of a new cl_parent tree.  clnt's
746          * children, if it has any, still point to the old xprt.
747          */
748         parent = clnt->cl_parent;
749         clnt->cl_parent = clnt;
750
751         /*
752          * The old rpc_auth cache cannot be re-used.  GSS
753          * contexts in particular are between a single
754          * client and server.
755          */
756         err = rpc_client_register(clnt, pseudoflavor, NULL);
757         if (err)
758                 goto out_revert;
759
760         synchronize_rcu();
761         if (parent != clnt)
762                 rpc_release_client(parent);
763         xprt_switch_put(oldxps);
764         xprt_put(old);
765         trace_rpc_clnt_replace_xprt(clnt);
766         return 0;
767
768 out_revert:
769         xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
770         rpc_clnt_set_transport(clnt, old, old_timeo);
771         clnt->cl_parent = parent;
772         rpc_client_register(clnt, pseudoflavor, NULL);
773         xprt_switch_put(xps);
774         xprt_put(xprt);
775         trace_rpc_clnt_replace_xprt_err(clnt);
776         return err;
777 }
778 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
779
780 static
781 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
782 {
783         struct rpc_xprt_switch *xps;
784
785         rcu_read_lock();
786         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
787         rcu_read_unlock();
788         if (xps == NULL)
789                 return -EAGAIN;
790         xprt_iter_init_listall(xpi, xps);
791         xprt_switch_put(xps);
792         return 0;
793 }
794
795 /**
796  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
797  * @clnt: pointer to client
798  * @fn: function to apply
799  * @data: void pointer to function data
800  *
801  * Iterates through the list of RPC transports currently attached to the
802  * client and applies the function fn(clnt, xprt, data).
803  *
804  * On error, the iteration stops, and the function returns the error value.
805  */
806 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
807                 int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
808                 void *data)
809 {
810         struct rpc_xprt_iter xpi;
811         int ret;
812
813         ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
814         if (ret)
815                 return ret;
816         for (;;) {
817                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
818
819                 if (!xprt)
820                         break;
821                 ret = fn(clnt, xprt, data);
822                 xprt_put(xprt);
823                 if (ret < 0)
824                         break;
825         }
826         xprt_iter_destroy(&xpi);
827         return ret;
828 }
829 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
830
831 /*
832  * Kill all tasks for the given client.
833  * XXX: kill their descendants as well?
834  */
835 void rpc_killall_tasks(struct rpc_clnt *clnt)
836 {
837         struct rpc_task *rovr;
838
839
840         if (list_empty(&clnt->cl_tasks))
841                 return;
842
843         /*
844          * Spin lock all_tasks to prevent changes...
845          */
846         trace_rpc_clnt_killall(clnt);
847         spin_lock(&clnt->cl_lock);
848         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
849                 rpc_signal_task(rovr);
850         spin_unlock(&clnt->cl_lock);
851 }
852 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
853
854 /*
855  * Properly shut down an RPC client, terminating all outstanding
856  * requests.
857  */
858 void rpc_shutdown_client(struct rpc_clnt *clnt)
859 {
860         might_sleep();
861
862         trace_rpc_clnt_shutdown(clnt);
863
864         while (!list_empty(&clnt->cl_tasks)) {
865                 rpc_killall_tasks(clnt);
866                 wait_event_timeout(destroy_wait,
867                         list_empty(&clnt->cl_tasks), 1*HZ);
868         }
869
870         rpc_release_client(clnt);
871 }
872 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
873
874 /*
875  * Free an RPC client
876  */
877 static void rpc_free_client_work(struct work_struct *work)
878 {
879         struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
880
881         trace_rpc_clnt_free(clnt);
882
883         /* These might block on processes that might allocate memory,
884          * so they cannot be called in rpciod, so they are handled separately
885          * here.
886          */
887         rpc_sysfs_client_destroy(clnt);
888         rpc_clnt_debugfs_unregister(clnt);
889         rpc_free_clid(clnt);
890         rpc_clnt_remove_pipedir(clnt);
891         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
892
893         kfree(clnt);
894         rpciod_down();
895 }
896 static struct rpc_clnt *
897 rpc_free_client(struct rpc_clnt *clnt)
898 {
899         struct rpc_clnt *parent = NULL;
900
901         trace_rpc_clnt_release(clnt);
902         if (clnt->cl_parent != clnt)
903                 parent = clnt->cl_parent;
904         rpc_unregister_client(clnt);
905         rpc_free_iostats(clnt->cl_metrics);
906         clnt->cl_metrics = NULL;
907         xprt_iter_destroy(&clnt->cl_xpi);
908         put_cred(clnt->cl_cred);
909
910         INIT_WORK(&clnt->cl_work, rpc_free_client_work);
911         schedule_work(&clnt->cl_work);
912         return parent;
913 }
914
915 /*
916  * Free an RPC client
917  */
918 static struct rpc_clnt *
919 rpc_free_auth(struct rpc_clnt *clnt)
920 {
921         /*
922          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
923          *       release remaining GSS contexts. This mechanism ensures
924          *       that it can do so safely.
925          */
926         if (clnt->cl_auth != NULL) {
927                 rpcauth_release(clnt->cl_auth);
928                 clnt->cl_auth = NULL;
929         }
930         if (refcount_dec_and_test(&clnt->cl_count))
931                 return rpc_free_client(clnt);
932         return NULL;
933 }
934
935 /*
936  * Release reference to the RPC client
937  */
938 void
939 rpc_release_client(struct rpc_clnt *clnt)
940 {
941         do {
942                 if (list_empty(&clnt->cl_tasks))
943                         wake_up(&destroy_wait);
944                 if (refcount_dec_not_one(&clnt->cl_count))
945                         break;
946                 clnt = rpc_free_auth(clnt);
947         } while (clnt != NULL);
948 }
949 EXPORT_SYMBOL_GPL(rpc_release_client);
950
951 /**
952  * rpc_bind_new_program - bind a new RPC program to an existing client
953  * @old: old rpc_client
954  * @program: rpc program to set
955  * @vers: rpc program version
956  *
957  * Clones the rpc client and sets up a new RPC program. This is mainly
958  * of use for enabling different RPC programs to share the same transport.
959  * The Sun NFSv2/v3 ACL protocol can do this.
960  */
961 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
962                                       const struct rpc_program *program,
963                                       u32 vers)
964 {
965         struct rpc_create_args args = {
966                 .program        = program,
967                 .prognumber     = program->number,
968                 .version        = vers,
969                 .authflavor     = old->cl_auth->au_flavor,
970                 .cred           = old->cl_cred,
971         };
972         struct rpc_clnt *clnt;
973         int err;
974
975         clnt = __rpc_clone_client(&args, old);
976         if (IS_ERR(clnt))
977                 goto out;
978         err = rpc_ping(clnt);
979         if (err != 0) {
980                 rpc_shutdown_client(clnt);
981                 clnt = ERR_PTR(err);
982         }
983 out:
984         return clnt;
985 }
986 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
987
988 struct rpc_xprt *
989 rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
990 {
991         struct rpc_xprt_switch *xps;
992
993         if (!xprt)
994                 return NULL;
995         rcu_read_lock();
996         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
997         atomic_long_inc(&xps->xps_queuelen);
998         rcu_read_unlock();
999         atomic_long_inc(&xprt->queuelen);
1000
1001         return xprt;
1002 }
1003
1004 static void
1005 rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1006 {
1007         struct rpc_xprt_switch *xps;
1008
1009         atomic_long_dec(&xprt->queuelen);
1010         rcu_read_lock();
1011         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1012         atomic_long_dec(&xps->xps_queuelen);
1013         rcu_read_unlock();
1014
1015         xprt_put(xprt);
1016 }
1017
1018 void rpc_task_release_transport(struct rpc_task *task)
1019 {
1020         struct rpc_xprt *xprt = task->tk_xprt;
1021
1022         if (xprt) {
1023                 task->tk_xprt = NULL;
1024                 if (task->tk_client)
1025                         rpc_task_release_xprt(task->tk_client, xprt);
1026                 else
1027                         xprt_put(xprt);
1028         }
1029 }
1030 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1031
1032 void rpc_task_release_client(struct rpc_task *task)
1033 {
1034         struct rpc_clnt *clnt = task->tk_client;
1035
1036         rpc_task_release_transport(task);
1037         if (clnt != NULL) {
1038                 /* Remove from client task list */
1039                 spin_lock(&clnt->cl_lock);
1040                 list_del(&task->tk_task);
1041                 spin_unlock(&clnt->cl_lock);
1042                 task->tk_client = NULL;
1043
1044                 rpc_release_client(clnt);
1045         }
1046 }
1047
1048 static struct rpc_xprt *
1049 rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1050 {
1051         struct rpc_xprt *xprt;
1052
1053         rcu_read_lock();
1054         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1055         rcu_read_unlock();
1056         return rpc_task_get_xprt(clnt, xprt);
1057 }
1058
1059 static struct rpc_xprt *
1060 rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1061 {
1062         return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1063 }
1064
1065 static
1066 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1067 {
1068         if (task->tk_xprt &&
1069                         !(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1070                         (task->tk_flags & RPC_TASK_MOVEABLE)))
1071                 return;
1072         if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1073                 task->tk_xprt = rpc_task_get_first_xprt(clnt);
1074         else
1075                 task->tk_xprt = rpc_task_get_next_xprt(clnt);
1076 }
1077
1078 static
1079 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1080 {
1081         rpc_task_set_transport(task, clnt);
1082         task->tk_client = clnt;
1083         refcount_inc(&clnt->cl_count);
1084         if (clnt->cl_softrtry)
1085                 task->tk_flags |= RPC_TASK_SOFT;
1086         if (clnt->cl_softerr)
1087                 task->tk_flags |= RPC_TASK_TIMEOUT;
1088         if (clnt->cl_noretranstimeo)
1089                 task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1090         /* Add to the client's list of all tasks */
1091         spin_lock(&clnt->cl_lock);
1092         list_add_tail(&task->tk_task, &clnt->cl_tasks);
1093         spin_unlock(&clnt->cl_lock);
1094 }
1095
1096 static void
1097 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1098 {
1099         if (msg != NULL) {
1100                 task->tk_msg.rpc_proc = msg->rpc_proc;
1101                 task->tk_msg.rpc_argp = msg->rpc_argp;
1102                 task->tk_msg.rpc_resp = msg->rpc_resp;
1103                 task->tk_msg.rpc_cred = msg->rpc_cred;
1104                 if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1105                         get_cred(task->tk_msg.rpc_cred);
1106         }
1107 }
1108
1109 /*
1110  * Default callback for async RPC calls
1111  */
1112 static void
1113 rpc_default_callback(struct rpc_task *task, void *data)
1114 {
1115 }
1116
1117 static const struct rpc_call_ops rpc_default_ops = {
1118         .rpc_call_done = rpc_default_callback,
1119 };
1120
1121 /**
1122  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1123  * @task_setup_data: pointer to task initialisation data
1124  */
1125 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1126 {
1127         struct rpc_task *task;
1128
1129         task = rpc_new_task(task_setup_data);
1130         if (IS_ERR(task))
1131                 return task;
1132
1133         if (!RPC_IS_ASYNC(task))
1134                 task->tk_flags |= RPC_TASK_CRED_NOREF;
1135
1136         rpc_task_set_client(task, task_setup_data->rpc_client);
1137         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1138
1139         if (task->tk_action == NULL)
1140                 rpc_call_start(task);
1141
1142         atomic_inc(&task->tk_count);
1143         rpc_execute(task);
1144         return task;
1145 }
1146 EXPORT_SYMBOL_GPL(rpc_run_task);
1147
1148 /**
1149  * rpc_call_sync - Perform a synchronous RPC call
1150  * @clnt: pointer to RPC client
1151  * @msg: RPC call parameters
1152  * @flags: RPC call flags
1153  */
1154 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1155 {
1156         struct rpc_task *task;
1157         struct rpc_task_setup task_setup_data = {
1158                 .rpc_client = clnt,
1159                 .rpc_message = msg,
1160                 .callback_ops = &rpc_default_ops,
1161                 .flags = flags,
1162         };
1163         int status;
1164
1165         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1166         if (flags & RPC_TASK_ASYNC) {
1167                 rpc_release_calldata(task_setup_data.callback_ops,
1168                         task_setup_data.callback_data);
1169                 return -EINVAL;
1170         }
1171
1172         task = rpc_run_task(&task_setup_data);
1173         if (IS_ERR(task))
1174                 return PTR_ERR(task);
1175         status = task->tk_status;
1176         rpc_put_task(task);
1177         return status;
1178 }
1179 EXPORT_SYMBOL_GPL(rpc_call_sync);
1180
1181 /**
1182  * rpc_call_async - Perform an asynchronous RPC call
1183  * @clnt: pointer to RPC client
1184  * @msg: RPC call parameters
1185  * @flags: RPC call flags
1186  * @tk_ops: RPC call ops
1187  * @data: user call data
1188  */
1189 int
1190 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1191                const struct rpc_call_ops *tk_ops, void *data)
1192 {
1193         struct rpc_task *task;
1194         struct rpc_task_setup task_setup_data = {
1195                 .rpc_client = clnt,
1196                 .rpc_message = msg,
1197                 .callback_ops = tk_ops,
1198                 .callback_data = data,
1199                 .flags = flags|RPC_TASK_ASYNC,
1200         };
1201
1202         task = rpc_run_task(&task_setup_data);
1203         if (IS_ERR(task))
1204                 return PTR_ERR(task);
1205         rpc_put_task(task);
1206         return 0;
1207 }
1208 EXPORT_SYMBOL_GPL(rpc_call_async);
1209
1210 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1211 static void call_bc_encode(struct rpc_task *task);
1212
1213 /**
1214  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1215  * rpc_execute against it
1216  * @req: RPC request
1217  */
1218 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1219 {
1220         struct rpc_task *task;
1221         struct rpc_task_setup task_setup_data = {
1222                 .callback_ops = &rpc_default_ops,
1223                 .flags = RPC_TASK_SOFTCONN |
1224                         RPC_TASK_NO_RETRANS_TIMEOUT,
1225         };
1226
1227         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1228         /*
1229          * Create an rpc_task to send the data
1230          */
1231         task = rpc_new_task(&task_setup_data);
1232         if (IS_ERR(task)) {
1233                 xprt_free_bc_request(req);
1234                 return task;
1235         }
1236
1237         xprt_init_bc_request(req, task);
1238
1239         task->tk_action = call_bc_encode;
1240         atomic_inc(&task->tk_count);
1241         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1242         rpc_execute(task);
1243
1244         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1245         return task;
1246 }
1247 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1248
1249 /**
1250  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1251  * @req: RPC request to prepare
1252  * @pages: vector of struct page pointers
1253  * @base: offset in first page where receive should start, in bytes
1254  * @len: expected size of the upper layer data payload, in bytes
1255  * @hdrsize: expected size of upper layer reply header, in XDR words
1256  *
1257  */
1258 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1259                              unsigned int base, unsigned int len,
1260                              unsigned int hdrsize)
1261 {
1262         hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1263
1264         xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1265         trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1266 }
1267 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1268
1269 void
1270 rpc_call_start(struct rpc_task *task)
1271 {
1272         task->tk_action = call_start;
1273 }
1274 EXPORT_SYMBOL_GPL(rpc_call_start);
1275
1276 /**
1277  * rpc_peeraddr - extract remote peer address from clnt's xprt
1278  * @clnt: RPC client structure
1279  * @buf: target buffer
1280  * @bufsize: length of target buffer
1281  *
1282  * Returns the number of bytes that are actually in the stored address.
1283  */
1284 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1285 {
1286         size_t bytes;
1287         struct rpc_xprt *xprt;
1288
1289         rcu_read_lock();
1290         xprt = rcu_dereference(clnt->cl_xprt);
1291
1292         bytes = xprt->addrlen;
1293         if (bytes > bufsize)
1294                 bytes = bufsize;
1295         memcpy(buf, &xprt->addr, bytes);
1296         rcu_read_unlock();
1297
1298         return bytes;
1299 }
1300 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1301
1302 /**
1303  * rpc_peeraddr2str - return remote peer address in printable format
1304  * @clnt: RPC client structure
1305  * @format: address format
1306  *
1307  * NB: the lifetime of the memory referenced by the returned pointer is
1308  * the same as the rpc_xprt itself.  As long as the caller uses this
1309  * pointer, it must hold the RCU read lock.
1310  */
1311 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1312                              enum rpc_display_format_t format)
1313 {
1314         struct rpc_xprt *xprt;
1315
1316         xprt = rcu_dereference(clnt->cl_xprt);
1317
1318         if (xprt->address_strings[format] != NULL)
1319                 return xprt->address_strings[format];
1320         else
1321                 return "unprintable";
1322 }
1323 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1324
1325 static const struct sockaddr_in rpc_inaddr_loopback = {
1326         .sin_family             = AF_INET,
1327         .sin_addr.s_addr        = htonl(INADDR_ANY),
1328 };
1329
1330 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1331         .sin6_family            = AF_INET6,
1332         .sin6_addr              = IN6ADDR_ANY_INIT,
1333 };
1334
1335 /*
1336  * Try a getsockname() on a connected datagram socket.  Using a
1337  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1338  * This conserves the ephemeral port number space.
1339  *
1340  * Returns zero and fills in "buf" if successful; otherwise, a
1341  * negative errno is returned.
1342  */
1343 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1344                         struct sockaddr *buf)
1345 {
1346         struct socket *sock;
1347         int err;
1348
1349         err = __sock_create(net, sap->sa_family,
1350                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1351         if (err < 0) {
1352                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1353                 goto out;
1354         }
1355
1356         switch (sap->sa_family) {
1357         case AF_INET:
1358                 err = kernel_bind(sock,
1359                                 (struct sockaddr *)&rpc_inaddr_loopback,
1360                                 sizeof(rpc_inaddr_loopback));
1361                 break;
1362         case AF_INET6:
1363                 err = kernel_bind(sock,
1364                                 (struct sockaddr *)&rpc_in6addr_loopback,
1365                                 sizeof(rpc_in6addr_loopback));
1366                 break;
1367         default:
1368                 err = -EAFNOSUPPORT;
1369                 goto out;
1370         }
1371         if (err < 0) {
1372                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1373                 goto out_release;
1374         }
1375
1376         err = kernel_connect(sock, sap, salen, 0);
1377         if (err < 0) {
1378                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1379                 goto out_release;
1380         }
1381
1382         err = kernel_getsockname(sock, buf);
1383         if (err < 0) {
1384                 dprintk("RPC:       getsockname failed (%d)\n", err);
1385                 goto out_release;
1386         }
1387
1388         err = 0;
1389         if (buf->sa_family == AF_INET6) {
1390                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1391                 sin6->sin6_scope_id = 0;
1392         }
1393         dprintk("RPC:       %s succeeded\n", __func__);
1394
1395 out_release:
1396         sock_release(sock);
1397 out:
1398         return err;
1399 }
1400
1401 /*
1402  * Scraping a connected socket failed, so we don't have a useable
1403  * local address.  Fallback: generate an address that will prevent
1404  * the server from calling us back.
1405  *
1406  * Returns zero and fills in "buf" if successful; otherwise, a
1407  * negative errno is returned.
1408  */
1409 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1410 {
1411         switch (family) {
1412         case AF_INET:
1413                 if (buflen < sizeof(rpc_inaddr_loopback))
1414                         return -EINVAL;
1415                 memcpy(buf, &rpc_inaddr_loopback,
1416                                 sizeof(rpc_inaddr_loopback));
1417                 break;
1418         case AF_INET6:
1419                 if (buflen < sizeof(rpc_in6addr_loopback))
1420                         return -EINVAL;
1421                 memcpy(buf, &rpc_in6addr_loopback,
1422                                 sizeof(rpc_in6addr_loopback));
1423                 break;
1424         default:
1425                 dprintk("RPC:       %s: address family not supported\n",
1426                         __func__);
1427                 return -EAFNOSUPPORT;
1428         }
1429         dprintk("RPC:       %s: succeeded\n", __func__);
1430         return 0;
1431 }
1432
1433 /**
1434  * rpc_localaddr - discover local endpoint address for an RPC client
1435  * @clnt: RPC client structure
1436  * @buf: target buffer
1437  * @buflen: size of target buffer, in bytes
1438  *
1439  * Returns zero and fills in "buf" and "buflen" if successful;
1440  * otherwise, a negative errno is returned.
1441  *
1442  * This works even if the underlying transport is not currently connected,
1443  * or if the upper layer never previously provided a source address.
1444  *
1445  * The result of this function call is transient: multiple calls in
1446  * succession may give different results, depending on how local
1447  * networking configuration changes over time.
1448  */
1449 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1450 {
1451         struct sockaddr_storage address;
1452         struct sockaddr *sap = (struct sockaddr *)&address;
1453         struct rpc_xprt *xprt;
1454         struct net *net;
1455         size_t salen;
1456         int err;
1457
1458         rcu_read_lock();
1459         xprt = rcu_dereference(clnt->cl_xprt);
1460         salen = xprt->addrlen;
1461         memcpy(sap, &xprt->addr, salen);
1462         net = get_net(xprt->xprt_net);
1463         rcu_read_unlock();
1464
1465         rpc_set_port(sap, 0);
1466         err = rpc_sockname(net, sap, salen, buf);
1467         put_net(net);
1468         if (err != 0)
1469                 /* Couldn't discover local address, return ANYADDR */
1470                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1471         return 0;
1472 }
1473 EXPORT_SYMBOL_GPL(rpc_localaddr);
1474
1475 void
1476 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1477 {
1478         struct rpc_xprt *xprt;
1479
1480         rcu_read_lock();
1481         xprt = rcu_dereference(clnt->cl_xprt);
1482         if (xprt->ops->set_buffer_size)
1483                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1484         rcu_read_unlock();
1485 }
1486 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1487
1488 /**
1489  * rpc_net_ns - Get the network namespace for this RPC client
1490  * @clnt: RPC client to query
1491  *
1492  */
1493 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1494 {
1495         struct net *ret;
1496
1497         rcu_read_lock();
1498         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1499         rcu_read_unlock();
1500         return ret;
1501 }
1502 EXPORT_SYMBOL_GPL(rpc_net_ns);
1503
1504 /**
1505  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1506  * @clnt: RPC client to query
1507  *
1508  * For stream transports, this is one RPC record fragment (see RFC
1509  * 1831), as we don't support multi-record requests yet.  For datagram
1510  * transports, this is the size of an IP packet minus the IP, UDP, and
1511  * RPC header sizes.
1512  */
1513 size_t rpc_max_payload(struct rpc_clnt *clnt)
1514 {
1515         size_t ret;
1516
1517         rcu_read_lock();
1518         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1519         rcu_read_unlock();
1520         return ret;
1521 }
1522 EXPORT_SYMBOL_GPL(rpc_max_payload);
1523
1524 /**
1525  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1526  * @clnt: RPC client to query
1527  */
1528 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1529 {
1530         struct rpc_xprt *xprt;
1531         size_t ret;
1532
1533         rcu_read_lock();
1534         xprt = rcu_dereference(clnt->cl_xprt);
1535         ret = xprt->ops->bc_maxpayload(xprt);
1536         rcu_read_unlock();
1537         return ret;
1538 }
1539 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1540
1541 unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1542 {
1543         struct rpc_xprt *xprt;
1544         unsigned int ret;
1545
1546         rcu_read_lock();
1547         xprt = rcu_dereference(clnt->cl_xprt);
1548         ret = xprt->ops->bc_num_slots(xprt);
1549         rcu_read_unlock();
1550         return ret;
1551 }
1552 EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1553
1554 /**
1555  * rpc_force_rebind - force transport to check that remote port is unchanged
1556  * @clnt: client to rebind
1557  *
1558  */
1559 void rpc_force_rebind(struct rpc_clnt *clnt)
1560 {
1561         if (clnt->cl_autobind) {
1562                 rcu_read_lock();
1563                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1564                 rcu_read_unlock();
1565         }
1566 }
1567 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1568
1569 static int
1570 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1571 {
1572         task->tk_status = 0;
1573         task->tk_rpc_status = 0;
1574         task->tk_action = action;
1575         return 1;
1576 }
1577
1578 /*
1579  * Restart an (async) RPC call. Usually called from within the
1580  * exit handler.
1581  */
1582 int
1583 rpc_restart_call(struct rpc_task *task)
1584 {
1585         return __rpc_restart_call(task, call_start);
1586 }
1587 EXPORT_SYMBOL_GPL(rpc_restart_call);
1588
1589 /*
1590  * Restart an (async) RPC call from the call_prepare state.
1591  * Usually called from within the exit handler.
1592  */
1593 int
1594 rpc_restart_call_prepare(struct rpc_task *task)
1595 {
1596         if (task->tk_ops->rpc_call_prepare != NULL)
1597                 return __rpc_restart_call(task, rpc_prepare_task);
1598         return rpc_restart_call(task);
1599 }
1600 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1601
1602 const char
1603 *rpc_proc_name(const struct rpc_task *task)
1604 {
1605         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1606
1607         if (proc) {
1608                 if (proc->p_name)
1609                         return proc->p_name;
1610                 else
1611                         return "NULL";
1612         } else
1613                 return "no proc";
1614 }
1615
1616 static void
1617 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1618 {
1619         trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1620         task->tk_rpc_status = rpc_status;
1621         rpc_exit(task, tk_status);
1622 }
1623
1624 static void
1625 rpc_call_rpcerror(struct rpc_task *task, int status)
1626 {
1627         __rpc_call_rpcerror(task, status, status);
1628 }
1629
1630 /*
1631  * 0.  Initial state
1632  *
1633  *     Other FSM states can be visited zero or more times, but
1634  *     this state is visited exactly once for each RPC.
1635  */
1636 static void
1637 call_start(struct rpc_task *task)
1638 {
1639         struct rpc_clnt *clnt = task->tk_client;
1640         int idx = task->tk_msg.rpc_proc->p_statidx;
1641
1642         trace_rpc_request(task);
1643
1644         /* Increment call count (version might not be valid for ping) */
1645         if (clnt->cl_program->version[clnt->cl_vers])
1646                 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1647         clnt->cl_stats->rpccnt++;
1648         task->tk_action = call_reserve;
1649         rpc_task_set_transport(task, clnt);
1650 }
1651
1652 /*
1653  * 1.   Reserve an RPC call slot
1654  */
1655 static void
1656 call_reserve(struct rpc_task *task)
1657 {
1658         task->tk_status  = 0;
1659         task->tk_action  = call_reserveresult;
1660         xprt_reserve(task);
1661 }
1662
1663 static void call_retry_reserve(struct rpc_task *task);
1664
1665 /*
1666  * 1b.  Grok the result of xprt_reserve()
1667  */
1668 static void
1669 call_reserveresult(struct rpc_task *task)
1670 {
1671         int status = task->tk_status;
1672
1673         /*
1674          * After a call to xprt_reserve(), we must have either
1675          * a request slot or else an error status.
1676          */
1677         task->tk_status = 0;
1678         if (status >= 0) {
1679                 if (task->tk_rqstp) {
1680                         task->tk_action = call_refresh;
1681                         return;
1682                 }
1683
1684                 rpc_call_rpcerror(task, -EIO);
1685                 return;
1686         }
1687
1688         switch (status) {
1689         case -ENOMEM:
1690                 rpc_delay(task, HZ >> 2);
1691                 fallthrough;
1692         case -EAGAIN:   /* woken up; retry */
1693                 task->tk_action = call_retry_reserve;
1694                 return;
1695         default:
1696                 rpc_call_rpcerror(task, status);
1697         }
1698 }
1699
1700 /*
1701  * 1c.  Retry reserving an RPC call slot
1702  */
1703 static void
1704 call_retry_reserve(struct rpc_task *task)
1705 {
1706         task->tk_status  = 0;
1707         task->tk_action  = call_reserveresult;
1708         xprt_retry_reserve(task);
1709 }
1710
1711 /*
1712  * 2.   Bind and/or refresh the credentials
1713  */
1714 static void
1715 call_refresh(struct rpc_task *task)
1716 {
1717         task->tk_action = call_refreshresult;
1718         task->tk_status = 0;
1719         task->tk_client->cl_stats->rpcauthrefresh++;
1720         rpcauth_refreshcred(task);
1721 }
1722
1723 /*
1724  * 2a.  Process the results of a credential refresh
1725  */
1726 static void
1727 call_refreshresult(struct rpc_task *task)
1728 {
1729         int status = task->tk_status;
1730
1731         task->tk_status = 0;
1732         task->tk_action = call_refresh;
1733         switch (status) {
1734         case 0:
1735                 if (rpcauth_uptodatecred(task)) {
1736                         task->tk_action = call_allocate;
1737                         return;
1738                 }
1739                 /* Use rate-limiting and a max number of retries if refresh
1740                  * had status 0 but failed to update the cred.
1741                  */
1742                 fallthrough;
1743         case -ETIMEDOUT:
1744                 rpc_delay(task, 3*HZ);
1745                 fallthrough;
1746         case -EAGAIN:
1747                 status = -EACCES;
1748                 fallthrough;
1749         case -EKEYEXPIRED:
1750                 if (!task->tk_cred_retry)
1751                         break;
1752                 task->tk_cred_retry--;
1753                 trace_rpc_retry_refresh_status(task);
1754                 return;
1755         case -ENOMEM:
1756                 rpc_delay(task, HZ >> 4);
1757                 return;
1758         }
1759         trace_rpc_refresh_status(task);
1760         rpc_call_rpcerror(task, status);
1761 }
1762
1763 /*
1764  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1765  *      (Note: buffer memory is freed in xprt_release).
1766  */
1767 static void
1768 call_allocate(struct rpc_task *task)
1769 {
1770         const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1771         struct rpc_rqst *req = task->tk_rqstp;
1772         struct rpc_xprt *xprt = req->rq_xprt;
1773         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1774         int status;
1775
1776         task->tk_status = 0;
1777         task->tk_action = call_encode;
1778
1779         if (req->rq_buffer)
1780                 return;
1781
1782         if (proc->p_proc != 0) {
1783                 BUG_ON(proc->p_arglen == 0);
1784                 if (proc->p_decode != NULL)
1785                         BUG_ON(proc->p_replen == 0);
1786         }
1787
1788         /*
1789          * Calculate the size (in quads) of the RPC call
1790          * and reply headers, and convert both values
1791          * to byte sizes.
1792          */
1793         req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1794                            proc->p_arglen;
1795         req->rq_callsize <<= 2;
1796         /*
1797          * Note: the reply buffer must at minimum allocate enough space
1798          * for the 'struct accepted_reply' from RFC5531.
1799          */
1800         req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1801                         max_t(size_t, proc->p_replen, 2);
1802         req->rq_rcvsize <<= 2;
1803
1804         status = xprt->ops->buf_alloc(task);
1805         trace_rpc_buf_alloc(task, status);
1806         if (status == 0)
1807                 return;
1808         if (status != -ENOMEM) {
1809                 rpc_call_rpcerror(task, status);
1810                 return;
1811         }
1812
1813         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1814                 task->tk_action = call_allocate;
1815                 rpc_delay(task, HZ>>4);
1816                 return;
1817         }
1818
1819         rpc_call_rpcerror(task, -ERESTARTSYS);
1820 }
1821
1822 static int
1823 rpc_task_need_encode(struct rpc_task *task)
1824 {
1825         return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1826                 (!(task->tk_flags & RPC_TASK_SENT) ||
1827                  !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1828                  xprt_request_need_retransmit(task));
1829 }
1830
1831 static void
1832 rpc_xdr_encode(struct rpc_task *task)
1833 {
1834         struct rpc_rqst *req = task->tk_rqstp;
1835         struct xdr_stream xdr;
1836
1837         xdr_buf_init(&req->rq_snd_buf,
1838                      req->rq_buffer,
1839                      req->rq_callsize);
1840         xdr_buf_init(&req->rq_rcv_buf,
1841                      req->rq_rbuffer,
1842                      req->rq_rcvsize);
1843
1844         req->rq_reply_bytes_recvd = 0;
1845         req->rq_snd_buf.head[0].iov_len = 0;
1846         xdr_init_encode(&xdr, &req->rq_snd_buf,
1847                         req->rq_snd_buf.head[0].iov_base, req);
1848         xdr_free_bvec(&req->rq_snd_buf);
1849         if (rpc_encode_header(task, &xdr))
1850                 return;
1851
1852         task->tk_status = rpcauth_wrap_req(task, &xdr);
1853 }
1854
1855 /*
1856  * 3.   Encode arguments of an RPC call
1857  */
1858 static void
1859 call_encode(struct rpc_task *task)
1860 {
1861         if (!rpc_task_need_encode(task))
1862                 goto out;
1863
1864         /* Dequeue task from the receive queue while we're encoding */
1865         xprt_request_dequeue_xprt(task);
1866         /* Encode here so that rpcsec_gss can use correct sequence number. */
1867         rpc_xdr_encode(task);
1868         /* Add task to reply queue before transmission to avoid races */
1869         if (task->tk_status == 0 && rpc_reply_expected(task))
1870                 task->tk_status = xprt_request_enqueue_receive(task);
1871         /* Did the encode result in an error condition? */
1872         if (task->tk_status != 0) {
1873                 /* Was the error nonfatal? */
1874                 switch (task->tk_status) {
1875                 case -EAGAIN:
1876                 case -ENOMEM:
1877                         rpc_delay(task, HZ >> 4);
1878                         break;
1879                 case -EKEYEXPIRED:
1880                         if (!task->tk_cred_retry) {
1881                                 rpc_exit(task, task->tk_status);
1882                         } else {
1883                                 task->tk_action = call_refresh;
1884                                 task->tk_cred_retry--;
1885                                 trace_rpc_retry_refresh_status(task);
1886                         }
1887                         break;
1888                 default:
1889                         rpc_call_rpcerror(task, task->tk_status);
1890                 }
1891                 return;
1892         }
1893
1894         xprt_request_enqueue_transmit(task);
1895 out:
1896         task->tk_action = call_transmit;
1897         /* Check that the connection is OK */
1898         if (!xprt_bound(task->tk_xprt))
1899                 task->tk_action = call_bind;
1900         else if (!xprt_connected(task->tk_xprt))
1901                 task->tk_action = call_connect;
1902 }
1903
1904 /*
1905  * Helpers to check if the task was already transmitted, and
1906  * to take action when that is the case.
1907  */
1908 static bool
1909 rpc_task_transmitted(struct rpc_task *task)
1910 {
1911         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1912 }
1913
1914 static void
1915 rpc_task_handle_transmitted(struct rpc_task *task)
1916 {
1917         xprt_end_transmit(task);
1918         task->tk_action = call_transmit_status;
1919 }
1920
1921 /*
1922  * 4.   Get the server port number if not yet set
1923  */
1924 static void
1925 call_bind(struct rpc_task *task)
1926 {
1927         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1928
1929         if (rpc_task_transmitted(task)) {
1930                 rpc_task_handle_transmitted(task);
1931                 return;
1932         }
1933
1934         if (xprt_bound(xprt)) {
1935                 task->tk_action = call_connect;
1936                 return;
1937         }
1938
1939         task->tk_action = call_bind_status;
1940         if (!xprt_prepare_transmit(task))
1941                 return;
1942
1943         xprt->ops->rpcbind(task);
1944 }
1945
1946 /*
1947  * 4a.  Sort out bind result
1948  */
1949 static void
1950 call_bind_status(struct rpc_task *task)
1951 {
1952         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1953         int status = -EIO;
1954
1955         if (rpc_task_transmitted(task)) {
1956                 rpc_task_handle_transmitted(task);
1957                 return;
1958         }
1959
1960         if (task->tk_status >= 0)
1961                 goto out_next;
1962         if (xprt_bound(xprt)) {
1963                 task->tk_status = 0;
1964                 goto out_next;
1965         }
1966
1967         switch (task->tk_status) {
1968         case -ENOMEM:
1969                 rpc_delay(task, HZ >> 2);
1970                 goto retry_timeout;
1971         case -EACCES:
1972                 trace_rpcb_prog_unavail_err(task);
1973                 /* fail immediately if this is an RPC ping */
1974                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1975                         status = -EOPNOTSUPP;
1976                         break;
1977                 }
1978                 if (task->tk_rebind_retry == 0)
1979                         break;
1980                 task->tk_rebind_retry--;
1981                 rpc_delay(task, 3*HZ);
1982                 goto retry_timeout;
1983         case -ENOBUFS:
1984                 rpc_delay(task, HZ >> 2);
1985                 goto retry_timeout;
1986         case -EAGAIN:
1987                 goto retry_timeout;
1988         case -ETIMEDOUT:
1989                 trace_rpcb_timeout_err(task);
1990                 goto retry_timeout;
1991         case -EPFNOSUPPORT:
1992                 /* server doesn't support any rpcbind version we know of */
1993                 trace_rpcb_bind_version_err(task);
1994                 break;
1995         case -EPROTONOSUPPORT:
1996                 trace_rpcb_bind_version_err(task);
1997                 goto retry_timeout;
1998         case -ECONNREFUSED:             /* connection problems */
1999         case -ECONNRESET:
2000         case -ECONNABORTED:
2001         case -ENOTCONN:
2002         case -EHOSTDOWN:
2003         case -ENETDOWN:
2004         case -EHOSTUNREACH:
2005         case -ENETUNREACH:
2006         case -EPIPE:
2007                 trace_rpcb_unreachable_err(task);
2008                 if (!RPC_IS_SOFTCONN(task)) {
2009                         rpc_delay(task, 5*HZ);
2010                         goto retry_timeout;
2011                 }
2012                 status = task->tk_status;
2013                 break;
2014         default:
2015                 trace_rpcb_unrecognized_err(task);
2016         }
2017
2018         rpc_call_rpcerror(task, status);
2019         return;
2020 out_next:
2021         task->tk_action = call_connect;
2022         return;
2023 retry_timeout:
2024         task->tk_status = 0;
2025         task->tk_action = call_bind;
2026         rpc_check_timeout(task);
2027 }
2028
2029 /*
2030  * 4b.  Connect to the RPC server
2031  */
2032 static void
2033 call_connect(struct rpc_task *task)
2034 {
2035         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2036
2037         if (rpc_task_transmitted(task)) {
2038                 rpc_task_handle_transmitted(task);
2039                 return;
2040         }
2041
2042         if (xprt_connected(xprt)) {
2043                 task->tk_action = call_transmit;
2044                 return;
2045         }
2046
2047         task->tk_action = call_connect_status;
2048         if (task->tk_status < 0)
2049                 return;
2050         if (task->tk_flags & RPC_TASK_NOCONNECT) {
2051                 rpc_call_rpcerror(task, -ENOTCONN);
2052                 return;
2053         }
2054         if (!xprt_prepare_transmit(task))
2055                 return;
2056         xprt_connect(task);
2057 }
2058
2059 /*
2060  * 4c.  Sort out connect result
2061  */
2062 static void
2063 call_connect_status(struct rpc_task *task)
2064 {
2065         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2066         struct rpc_clnt *clnt = task->tk_client;
2067         int status = task->tk_status;
2068
2069         if (rpc_task_transmitted(task)) {
2070                 rpc_task_handle_transmitted(task);
2071                 return;
2072         }
2073
2074         trace_rpc_connect_status(task);
2075
2076         if (task->tk_status == 0) {
2077                 clnt->cl_stats->netreconn++;
2078                 goto out_next;
2079         }
2080         if (xprt_connected(xprt)) {
2081                 task->tk_status = 0;
2082                 goto out_next;
2083         }
2084
2085         task->tk_status = 0;
2086         switch (status) {
2087         case -ECONNREFUSED:
2088                 /* A positive refusal suggests a rebind is needed. */
2089                 if (RPC_IS_SOFTCONN(task))
2090                         break;
2091                 if (clnt->cl_autobind) {
2092                         rpc_force_rebind(clnt);
2093                         goto out_retry;
2094                 }
2095                 fallthrough;
2096         case -ECONNRESET:
2097         case -ECONNABORTED:
2098         case -ENETDOWN:
2099         case -ENETUNREACH:
2100         case -EHOSTUNREACH:
2101         case -EPIPE:
2102         case -EPROTO:
2103                 xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2104                                             task->tk_rqstp->rq_connect_cookie);
2105                 if (RPC_IS_SOFTCONN(task))
2106                         break;
2107                 /* retry with existing socket, after a delay */
2108                 rpc_delay(task, 3*HZ);
2109                 fallthrough;
2110         case -EADDRINUSE:
2111         case -ENOTCONN:
2112         case -EAGAIN:
2113         case -ETIMEDOUT:
2114                 if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2115                     (task->tk_flags & RPC_TASK_MOVEABLE) &&
2116                     test_bit(XPRT_REMOVE, &xprt->state)) {
2117                         struct rpc_xprt *saved = task->tk_xprt;
2118                         struct rpc_xprt_switch *xps;
2119
2120                         rcu_read_lock();
2121                         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2122                         rcu_read_unlock();
2123                         if (xps->xps_nxprts > 1) {
2124                                 long value;
2125
2126                                 xprt_release(task);
2127                                 value = atomic_long_dec_return(&xprt->queuelen);
2128                                 if (value == 0)
2129                                         rpc_xprt_switch_remove_xprt(xps, saved);
2130                                 xprt_put(saved);
2131                                 task->tk_xprt = NULL;
2132                                 task->tk_action = call_start;
2133                         }
2134                         xprt_switch_put(xps);
2135                         if (!task->tk_xprt)
2136                                 return;
2137                 }
2138                 goto out_retry;
2139         case -ENOBUFS:
2140                 rpc_delay(task, HZ >> 2);
2141                 goto out_retry;
2142         }
2143         rpc_call_rpcerror(task, status);
2144         return;
2145 out_next:
2146         task->tk_action = call_transmit;
2147         return;
2148 out_retry:
2149         /* Check for timeouts before looping back to call_bind */
2150         task->tk_action = call_bind;
2151         rpc_check_timeout(task);
2152 }
2153
2154 /*
2155  * 5.   Transmit the RPC request, and wait for reply
2156  */
2157 static void
2158 call_transmit(struct rpc_task *task)
2159 {
2160         if (rpc_task_transmitted(task)) {
2161                 rpc_task_handle_transmitted(task);
2162                 return;
2163         }
2164
2165         task->tk_action = call_transmit_status;
2166         if (!xprt_prepare_transmit(task))
2167                 return;
2168         task->tk_status = 0;
2169         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2170                 if (!xprt_connected(task->tk_xprt)) {
2171                         task->tk_status = -ENOTCONN;
2172                         return;
2173                 }
2174                 xprt_transmit(task);
2175         }
2176         xprt_end_transmit(task);
2177 }
2178
2179 /*
2180  * 5a.  Handle cleanup after a transmission
2181  */
2182 static void
2183 call_transmit_status(struct rpc_task *task)
2184 {
2185         task->tk_action = call_status;
2186
2187         /*
2188          * Common case: success.  Force the compiler to put this
2189          * test first.
2190          */
2191         if (rpc_task_transmitted(task)) {
2192                 task->tk_status = 0;
2193                 xprt_request_wait_receive(task);
2194                 return;
2195         }
2196
2197         switch (task->tk_status) {
2198         default:
2199                 break;
2200         case -EBADMSG:
2201                 task->tk_status = 0;
2202                 task->tk_action = call_encode;
2203                 break;
2204                 /*
2205                  * Special cases: if we've been waiting on the
2206                  * socket's write_space() callback, or if the
2207                  * socket just returned a connection error,
2208                  * then hold onto the transport lock.
2209                  */
2210         case -ENOMEM:
2211         case -ENOBUFS:
2212                 rpc_delay(task, HZ>>2);
2213                 fallthrough;
2214         case -EBADSLT:
2215         case -EAGAIN:
2216                 task->tk_action = call_transmit;
2217                 task->tk_status = 0;
2218                 break;
2219         case -ECONNREFUSED:
2220         case -EHOSTDOWN:
2221         case -ENETDOWN:
2222         case -EHOSTUNREACH:
2223         case -ENETUNREACH:
2224         case -EPERM:
2225                 if (RPC_IS_SOFTCONN(task)) {
2226                         if (!task->tk_msg.rpc_proc->p_proc)
2227                                 trace_xprt_ping(task->tk_xprt,
2228                                                 task->tk_status);
2229                         rpc_call_rpcerror(task, task->tk_status);
2230                         return;
2231                 }
2232                 fallthrough;
2233         case -ECONNRESET:
2234         case -ECONNABORTED:
2235         case -EADDRINUSE:
2236         case -ENOTCONN:
2237         case -EPIPE:
2238                 task->tk_action = call_bind;
2239                 task->tk_status = 0;
2240                 break;
2241         }
2242         rpc_check_timeout(task);
2243 }
2244
2245 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2246 static void call_bc_transmit(struct rpc_task *task);
2247 static void call_bc_transmit_status(struct rpc_task *task);
2248
2249 static void
2250 call_bc_encode(struct rpc_task *task)
2251 {
2252         xprt_request_enqueue_transmit(task);
2253         task->tk_action = call_bc_transmit;
2254 }
2255
2256 /*
2257  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2258  * addition, disconnect on connectivity errors.
2259  */
2260 static void
2261 call_bc_transmit(struct rpc_task *task)
2262 {
2263         task->tk_action = call_bc_transmit_status;
2264         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2265                 if (!xprt_prepare_transmit(task))
2266                         return;
2267                 task->tk_status = 0;
2268                 xprt_transmit(task);
2269         }
2270         xprt_end_transmit(task);
2271 }
2272
2273 static void
2274 call_bc_transmit_status(struct rpc_task *task)
2275 {
2276         struct rpc_rqst *req = task->tk_rqstp;
2277
2278         if (rpc_task_transmitted(task))
2279                 task->tk_status = 0;
2280
2281         switch (task->tk_status) {
2282         case 0:
2283                 /* Success */
2284         case -ENETDOWN:
2285         case -EHOSTDOWN:
2286         case -EHOSTUNREACH:
2287         case -ENETUNREACH:
2288         case -ECONNRESET:
2289         case -ECONNREFUSED:
2290         case -EADDRINUSE:
2291         case -ENOTCONN:
2292         case -EPIPE:
2293                 break;
2294         case -ENOMEM:
2295         case -ENOBUFS:
2296                 rpc_delay(task, HZ>>2);
2297                 fallthrough;
2298         case -EBADSLT:
2299         case -EAGAIN:
2300                 task->tk_status = 0;
2301                 task->tk_action = call_bc_transmit;
2302                 return;
2303         case -ETIMEDOUT:
2304                 /*
2305                  * Problem reaching the server.  Disconnect and let the
2306                  * forechannel reestablish the connection.  The server will
2307                  * have to retransmit the backchannel request and we'll
2308                  * reprocess it.  Since these ops are idempotent, there's no
2309                  * need to cache our reply at this time.
2310                  */
2311                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2312                         "error: %d\n", task->tk_status);
2313                 xprt_conditional_disconnect(req->rq_xprt,
2314                         req->rq_connect_cookie);
2315                 break;
2316         default:
2317                 /*
2318                  * We were unable to reply and will have to drop the
2319                  * request.  The server should reconnect and retransmit.
2320                  */
2321                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2322                         "error: %d\n", task->tk_status);
2323                 break;
2324         }
2325         task->tk_action = rpc_exit_task;
2326 }
2327 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2328
2329 /*
2330  * 6.   Sort out the RPC call status
2331  */
2332 static void
2333 call_status(struct rpc_task *task)
2334 {
2335         struct rpc_clnt *clnt = task->tk_client;
2336         int             status;
2337
2338         if (!task->tk_msg.rpc_proc->p_proc)
2339                 trace_xprt_ping(task->tk_xprt, task->tk_status);
2340
2341         status = task->tk_status;
2342         if (status >= 0) {
2343                 task->tk_action = call_decode;
2344                 return;
2345         }
2346
2347         trace_rpc_call_status(task);
2348         task->tk_status = 0;
2349         switch(status) {
2350         case -EHOSTDOWN:
2351         case -ENETDOWN:
2352         case -EHOSTUNREACH:
2353         case -ENETUNREACH:
2354         case -EPERM:
2355                 if (RPC_IS_SOFTCONN(task))
2356                         goto out_exit;
2357                 /*
2358                  * Delay any retries for 3 seconds, then handle as if it
2359                  * were a timeout.
2360                  */
2361                 rpc_delay(task, 3*HZ);
2362                 fallthrough;
2363         case -ETIMEDOUT:
2364                 break;
2365         case -ECONNREFUSED:
2366         case -ECONNRESET:
2367         case -ECONNABORTED:
2368         case -ENOTCONN:
2369                 rpc_force_rebind(clnt);
2370                 break;
2371         case -EADDRINUSE:
2372                 rpc_delay(task, 3*HZ);
2373                 fallthrough;
2374         case -EPIPE:
2375         case -EAGAIN:
2376                 break;
2377         case -ENFILE:
2378         case -ENOBUFS:
2379         case -ENOMEM:
2380                 rpc_delay(task, HZ>>2);
2381                 break;
2382         case -EIO:
2383                 /* shutdown or soft timeout */
2384                 goto out_exit;
2385         default:
2386                 if (clnt->cl_chatty)
2387                         printk("%s: RPC call returned error %d\n",
2388                                clnt->cl_program->name, -status);
2389                 goto out_exit;
2390         }
2391         task->tk_action = call_encode;
2392         if (status != -ECONNRESET && status != -ECONNABORTED)
2393                 rpc_check_timeout(task);
2394         return;
2395 out_exit:
2396         rpc_call_rpcerror(task, status);
2397 }
2398
2399 static bool
2400 rpc_check_connected(const struct rpc_rqst *req)
2401 {
2402         /* No allocated request or transport? return true */
2403         if (!req || !req->rq_xprt)
2404                 return true;
2405         return xprt_connected(req->rq_xprt);
2406 }
2407
2408 static void
2409 rpc_check_timeout(struct rpc_task *task)
2410 {
2411         struct rpc_clnt *clnt = task->tk_client;
2412
2413         if (RPC_SIGNALLED(task)) {
2414                 rpc_call_rpcerror(task, -ERESTARTSYS);
2415                 return;
2416         }
2417
2418         if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2419                 return;
2420
2421         trace_rpc_timeout_status(task);
2422         task->tk_timeouts++;
2423
2424         if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2425                 rpc_call_rpcerror(task, -ETIMEDOUT);
2426                 return;
2427         }
2428
2429         if (RPC_IS_SOFT(task)) {
2430                 /*
2431                  * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2432                  * been sent, it should time out only if the transport
2433                  * connection gets terminally broken.
2434                  */
2435                 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2436                     rpc_check_connected(task->tk_rqstp))
2437                         return;
2438
2439                 if (clnt->cl_chatty) {
2440                         pr_notice_ratelimited(
2441                                 "%s: server %s not responding, timed out\n",
2442                                 clnt->cl_program->name,
2443                                 task->tk_xprt->servername);
2444                 }
2445                 if (task->tk_flags & RPC_TASK_TIMEOUT)
2446                         rpc_call_rpcerror(task, -ETIMEDOUT);
2447                 else
2448                         __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2449                 return;
2450         }
2451
2452         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2453                 task->tk_flags |= RPC_CALL_MAJORSEEN;
2454                 if (clnt->cl_chatty) {
2455                         pr_notice_ratelimited(
2456                                 "%s: server %s not responding, still trying\n",
2457                                 clnt->cl_program->name,
2458                                 task->tk_xprt->servername);
2459                 }
2460         }
2461         rpc_force_rebind(clnt);
2462         /*
2463          * Did our request time out due to an RPCSEC_GSS out-of-sequence
2464          * event? RFC2203 requires the server to drop all such requests.
2465          */
2466         rpcauth_invalcred(task);
2467 }
2468
2469 /*
2470  * 7.   Decode the RPC reply
2471  */
2472 static void
2473 call_decode(struct rpc_task *task)
2474 {
2475         struct rpc_clnt *clnt = task->tk_client;
2476         struct rpc_rqst *req = task->tk_rqstp;
2477         struct xdr_stream xdr;
2478         int err;
2479
2480         if (!task->tk_msg.rpc_proc->p_decode) {
2481                 task->tk_action = rpc_exit_task;
2482                 return;
2483         }
2484
2485         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2486                 if (clnt->cl_chatty) {
2487                         pr_notice_ratelimited("%s: server %s OK\n",
2488                                 clnt->cl_program->name,
2489                                 task->tk_xprt->servername);
2490                 }
2491                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2492         }
2493
2494         /*
2495          * Did we ever call xprt_complete_rqst()? If not, we should assume
2496          * the message is incomplete.
2497          */
2498         err = -EAGAIN;
2499         if (!req->rq_reply_bytes_recvd)
2500                 goto out;
2501
2502         /* Ensure that we see all writes made by xprt_complete_rqst()
2503          * before it changed req->rq_reply_bytes_recvd.
2504          */
2505         smp_rmb();
2506
2507         req->rq_rcv_buf.len = req->rq_private_buf.len;
2508         trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2509
2510         /* Check that the softirq receive buffer is valid */
2511         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2512                                 sizeof(req->rq_rcv_buf)) != 0);
2513
2514         xdr_init_decode(&xdr, &req->rq_rcv_buf,
2515                         req->rq_rcv_buf.head[0].iov_base, req);
2516         err = rpc_decode_header(task, &xdr);
2517 out:
2518         switch (err) {
2519         case 0:
2520                 task->tk_action = rpc_exit_task;
2521                 task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2522                 return;
2523         case -EAGAIN:
2524                 task->tk_status = 0;
2525                 if (task->tk_client->cl_discrtry)
2526                         xprt_conditional_disconnect(req->rq_xprt,
2527                                                     req->rq_connect_cookie);
2528                 task->tk_action = call_encode;
2529                 rpc_check_timeout(task);
2530                 break;
2531         case -EKEYREJECTED:
2532                 task->tk_action = call_reserve;
2533                 rpc_check_timeout(task);
2534                 rpcauth_invalcred(task);
2535                 /* Ensure we obtain a new XID if we retry! */
2536                 xprt_release(task);
2537         }
2538 }
2539
2540 static int
2541 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2542 {
2543         struct rpc_clnt *clnt = task->tk_client;
2544         struct rpc_rqst *req = task->tk_rqstp;
2545         __be32 *p;
2546         int error;
2547
2548         error = -EMSGSIZE;
2549         p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2550         if (!p)
2551                 goto out_fail;
2552         *p++ = req->rq_xid;
2553         *p++ = rpc_call;
2554         *p++ = cpu_to_be32(RPC_VERSION);
2555         *p++ = cpu_to_be32(clnt->cl_prog);
2556         *p++ = cpu_to_be32(clnt->cl_vers);
2557         *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2558
2559         error = rpcauth_marshcred(task, xdr);
2560         if (error < 0)
2561                 goto out_fail;
2562         return 0;
2563 out_fail:
2564         trace_rpc_bad_callhdr(task);
2565         rpc_call_rpcerror(task, error);
2566         return error;
2567 }
2568
2569 static noinline int
2570 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2571 {
2572         struct rpc_clnt *clnt = task->tk_client;
2573         int error;
2574         __be32 *p;
2575
2576         /* RFC-1014 says that the representation of XDR data must be a
2577          * multiple of four bytes
2578          * - if it isn't pointer subtraction in the NFS client may give
2579          *   undefined results
2580          */
2581         if (task->tk_rqstp->rq_rcv_buf.len & 3)
2582                 goto out_unparsable;
2583
2584         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2585         if (!p)
2586                 goto out_unparsable;
2587         p++;    /* skip XID */
2588         if (*p++ != rpc_reply)
2589                 goto out_unparsable;
2590         if (*p++ != rpc_msg_accepted)
2591                 goto out_msg_denied;
2592
2593         error = rpcauth_checkverf(task, xdr);
2594         if (error)
2595                 goto out_verifier;
2596
2597         p = xdr_inline_decode(xdr, sizeof(*p));
2598         if (!p)
2599                 goto out_unparsable;
2600         switch (*p) {
2601         case rpc_success:
2602                 return 0;
2603         case rpc_prog_unavail:
2604                 trace_rpc__prog_unavail(task);
2605                 error = -EPFNOSUPPORT;
2606                 goto out_err;
2607         case rpc_prog_mismatch:
2608                 trace_rpc__prog_mismatch(task);
2609                 error = -EPROTONOSUPPORT;
2610                 goto out_err;
2611         case rpc_proc_unavail:
2612                 trace_rpc__proc_unavail(task);
2613                 error = -EOPNOTSUPP;
2614                 goto out_err;
2615         case rpc_garbage_args:
2616         case rpc_system_err:
2617                 trace_rpc__garbage_args(task);
2618                 error = -EIO;
2619                 break;
2620         default:
2621                 goto out_unparsable;
2622         }
2623
2624 out_garbage:
2625         clnt->cl_stats->rpcgarbage++;
2626         if (task->tk_garb_retry) {
2627                 task->tk_garb_retry--;
2628                 task->tk_action = call_encode;
2629                 return -EAGAIN;
2630         }
2631 out_err:
2632         rpc_call_rpcerror(task, error);
2633         return error;
2634
2635 out_unparsable:
2636         trace_rpc__unparsable(task);
2637         error = -EIO;
2638         goto out_garbage;
2639
2640 out_verifier:
2641         trace_rpc_bad_verifier(task);
2642         goto out_garbage;
2643
2644 out_msg_denied:
2645         error = -EACCES;
2646         p = xdr_inline_decode(xdr, sizeof(*p));
2647         if (!p)
2648                 goto out_unparsable;
2649         switch (*p++) {
2650         case rpc_auth_error:
2651                 break;
2652         case rpc_mismatch:
2653                 trace_rpc__mismatch(task);
2654                 error = -EPROTONOSUPPORT;
2655                 goto out_err;
2656         default:
2657                 goto out_unparsable;
2658         }
2659
2660         p = xdr_inline_decode(xdr, sizeof(*p));
2661         if (!p)
2662                 goto out_unparsable;
2663         switch (*p++) {
2664         case rpc_autherr_rejectedcred:
2665         case rpc_autherr_rejectedverf:
2666         case rpcsec_gsserr_credproblem:
2667         case rpcsec_gsserr_ctxproblem:
2668                 if (!task->tk_cred_retry)
2669                         break;
2670                 task->tk_cred_retry--;
2671                 trace_rpc__stale_creds(task);
2672                 return -EKEYREJECTED;
2673         case rpc_autherr_badcred:
2674         case rpc_autherr_badverf:
2675                 /* possibly garbled cred/verf? */
2676                 if (!task->tk_garb_retry)
2677                         break;
2678                 task->tk_garb_retry--;
2679                 trace_rpc__bad_creds(task);
2680                 task->tk_action = call_encode;
2681                 return -EAGAIN;
2682         case rpc_autherr_tooweak:
2683                 trace_rpc__auth_tooweak(task);
2684                 pr_warn("RPC: server %s requires stronger authentication.\n",
2685                         task->tk_xprt->servername);
2686                 break;
2687         default:
2688                 goto out_unparsable;
2689         }
2690         goto out_err;
2691 }
2692
2693 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2694                 const void *obj)
2695 {
2696 }
2697
2698 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2699                 void *obj)
2700 {
2701         return 0;
2702 }
2703
2704 static const struct rpc_procinfo rpcproc_null = {
2705         .p_encode = rpcproc_encode_null,
2706         .p_decode = rpcproc_decode_null,
2707 };
2708
2709 static void
2710 rpc_null_call_prepare(struct rpc_task *task, void *data)
2711 {
2712         task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2713         rpc_call_start(task);
2714 }
2715
2716 static const struct rpc_call_ops rpc_null_ops = {
2717         .rpc_call_prepare = rpc_null_call_prepare,
2718         .rpc_call_done = rpc_default_callback,
2719 };
2720
2721 static
2722 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2723                 struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2724                 const struct rpc_call_ops *ops, void *data)
2725 {
2726         struct rpc_message msg = {
2727                 .rpc_proc = &rpcproc_null,
2728         };
2729         struct rpc_task_setup task_setup_data = {
2730                 .rpc_client = clnt,
2731                 .rpc_xprt = xprt,
2732                 .rpc_message = &msg,
2733                 .rpc_op_cred = cred,
2734                 .callback_ops = ops ?: &rpc_null_ops,
2735                 .callback_data = data,
2736                 .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2737                          RPC_TASK_NULLCREDS,
2738         };
2739
2740         return rpc_run_task(&task_setup_data);
2741 }
2742
2743 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2744 {
2745         return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2746 }
2747 EXPORT_SYMBOL_GPL(rpc_call_null);
2748
2749 static int rpc_ping(struct rpc_clnt *clnt)
2750 {
2751         struct rpc_task *task;
2752         int status;
2753
2754         task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2755         if (IS_ERR(task))
2756                 return PTR_ERR(task);
2757         status = task->tk_status;
2758         rpc_put_task(task);
2759         return status;
2760 }
2761
2762 struct rpc_cb_add_xprt_calldata {
2763         struct rpc_xprt_switch *xps;
2764         struct rpc_xprt *xprt;
2765 };
2766
2767 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2768 {
2769         struct rpc_cb_add_xprt_calldata *data = calldata;
2770
2771         if (task->tk_status == 0)
2772                 rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2773 }
2774
2775 static void rpc_cb_add_xprt_release(void *calldata)
2776 {
2777         struct rpc_cb_add_xprt_calldata *data = calldata;
2778
2779         xprt_put(data->xprt);
2780         xprt_switch_put(data->xps);
2781         kfree(data);
2782 }
2783
2784 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2785         .rpc_call_prepare = rpc_null_call_prepare,
2786         .rpc_call_done = rpc_cb_add_xprt_done,
2787         .rpc_release = rpc_cb_add_xprt_release,
2788 };
2789
2790 /**
2791  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2792  * @clnt: pointer to struct rpc_clnt
2793  * @xps: pointer to struct rpc_xprt_switch,
2794  * @xprt: pointer struct rpc_xprt
2795  * @dummy: unused
2796  */
2797 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2798                 struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2799                 void *dummy)
2800 {
2801         struct rpc_cb_add_xprt_calldata *data;
2802         struct rpc_task *task;
2803
2804         if (xps->xps_nunique_destaddr_xprts + 1 > clnt->cl_max_connect) {
2805                 rcu_read_lock();
2806                 pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2807                         "transport to server: %s\n", clnt->cl_max_connect,
2808                         rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2809                 rcu_read_unlock();
2810                 return -EINVAL;
2811         }
2812
2813         data = kmalloc(sizeof(*data), GFP_KERNEL);
2814         if (!data)
2815                 return -ENOMEM;
2816         data->xps = xprt_switch_get(xps);
2817         data->xprt = xprt_get(xprt);
2818         if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2819                 rpc_cb_add_xprt_release(data);
2820                 goto success;
2821         }
2822
2823         task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2824                         &rpc_cb_add_xprt_call_ops, data);
2825         data->xps->xps_nunique_destaddr_xprts++;
2826         rpc_put_task(task);
2827 success:
2828         return 1;
2829 }
2830 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2831
2832 /**
2833  * rpc_clnt_setup_test_and_add_xprt()
2834  *
2835  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2836  *   1) caller of the test function must dereference the rpc_xprt_switch
2837  *   and the rpc_xprt.
2838  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2839  *   the rpc_call_done routine.
2840  *
2841  * Upon success (return of 1), the test function adds the new
2842  * transport to the rpc_clnt xprt switch
2843  *
2844  * @clnt: struct rpc_clnt to get the new transport
2845  * @xps:  the rpc_xprt_switch to hold the new transport
2846  * @xprt: the rpc_xprt to test
2847  * @data: a struct rpc_add_xprt_test pointer that holds the test function
2848  *        and test function call data
2849  */
2850 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2851                                      struct rpc_xprt_switch *xps,
2852                                      struct rpc_xprt *xprt,
2853                                      void *data)
2854 {
2855         struct rpc_task *task;
2856         struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
2857         int status = -EADDRINUSE;
2858
2859         xprt = xprt_get(xprt);
2860         xprt_switch_get(xps);
2861
2862         if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2863                 goto out_err;
2864
2865         /* Test the connection */
2866         task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2867         if (IS_ERR(task)) {
2868                 status = PTR_ERR(task);
2869                 goto out_err;
2870         }
2871         status = task->tk_status;
2872         rpc_put_task(task);
2873
2874         if (status < 0)
2875                 goto out_err;
2876
2877         /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2878         xtest->add_xprt_test(clnt, xprt, xtest->data);
2879
2880         xprt_put(xprt);
2881         xprt_switch_put(xps);
2882
2883         /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2884         return 1;
2885 out_err:
2886         xprt_put(xprt);
2887         xprt_switch_put(xps);
2888         pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
2889                 status, xprt->address_strings[RPC_DISPLAY_ADDR]);
2890         return status;
2891 }
2892 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2893
2894 /**
2895  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2896  * @clnt: pointer to struct rpc_clnt
2897  * @xprtargs: pointer to struct xprt_create
2898  * @setup: callback to test and/or set up the connection
2899  * @data: pointer to setup function data
2900  *
2901  * Creates a new transport using the parameters set in args and
2902  * adds it to clnt.
2903  * If ping is set, then test that connectivity succeeds before
2904  * adding the new transport.
2905  *
2906  */
2907 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2908                 struct xprt_create *xprtargs,
2909                 int (*setup)(struct rpc_clnt *,
2910                         struct rpc_xprt_switch *,
2911                         struct rpc_xprt *,
2912                         void *),
2913                 void *data)
2914 {
2915         struct rpc_xprt_switch *xps;
2916         struct rpc_xprt *xprt;
2917         unsigned long connect_timeout;
2918         unsigned long reconnect_timeout;
2919         unsigned char resvport, reuseport;
2920         int ret = 0, ident;
2921
2922         rcu_read_lock();
2923         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2924         xprt = xprt_iter_xprt(&clnt->cl_xpi);
2925         if (xps == NULL || xprt == NULL) {
2926                 rcu_read_unlock();
2927                 xprt_switch_put(xps);
2928                 return -EAGAIN;
2929         }
2930         resvport = xprt->resvport;
2931         reuseport = xprt->reuseport;
2932         connect_timeout = xprt->connect_timeout;
2933         reconnect_timeout = xprt->max_reconnect_timeout;
2934         ident = xprt->xprt_class->ident;
2935         rcu_read_unlock();
2936
2937         if (!xprtargs->ident)
2938                 xprtargs->ident = ident;
2939         xprt = xprt_create_transport(xprtargs);
2940         if (IS_ERR(xprt)) {
2941                 ret = PTR_ERR(xprt);
2942                 goto out_put_switch;
2943         }
2944         xprt->resvport = resvport;
2945         xprt->reuseport = reuseport;
2946         if (xprt->ops->set_connect_timeout != NULL)
2947                 xprt->ops->set_connect_timeout(xprt,
2948                                 connect_timeout,
2949                                 reconnect_timeout);
2950
2951         rpc_xprt_switch_set_roundrobin(xps);
2952         if (setup) {
2953                 ret = setup(clnt, xps, xprt, data);
2954                 if (ret != 0)
2955                         goto out_put_xprt;
2956         }
2957         rpc_xprt_switch_add_xprt(xps, xprt);
2958 out_put_xprt:
2959         xprt_put(xprt);
2960 out_put_switch:
2961         xprt_switch_put(xps);
2962         return ret;
2963 }
2964 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
2965
2966 struct connect_timeout_data {
2967         unsigned long connect_timeout;
2968         unsigned long reconnect_timeout;
2969 };
2970
2971 static int
2972 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
2973                 struct rpc_xprt *xprt,
2974                 void *data)
2975 {
2976         struct connect_timeout_data *timeo = data;
2977
2978         if (xprt->ops->set_connect_timeout)
2979                 xprt->ops->set_connect_timeout(xprt,
2980                                 timeo->connect_timeout,
2981                                 timeo->reconnect_timeout);
2982         return 0;
2983 }
2984
2985 void
2986 rpc_set_connect_timeout(struct rpc_clnt *clnt,
2987                 unsigned long connect_timeout,
2988                 unsigned long reconnect_timeout)
2989 {
2990         struct connect_timeout_data timeout = {
2991                 .connect_timeout = connect_timeout,
2992                 .reconnect_timeout = reconnect_timeout,
2993         };
2994         rpc_clnt_iterate_for_each_xprt(clnt,
2995                         rpc_xprt_set_connect_timeout,
2996                         &timeout);
2997 }
2998 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
2999
3000 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
3001 {
3002         rcu_read_lock();
3003         xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3004         rcu_read_unlock();
3005 }
3006 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
3007
3008 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3009 {
3010         rcu_read_lock();
3011         rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3012                                  xprt);
3013         rcu_read_unlock();
3014 }
3015 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3016
3017 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3018                                    const struct sockaddr *sap)
3019 {
3020         struct rpc_xprt_switch *xps;
3021         bool ret;
3022
3023         rcu_read_lock();
3024         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3025         ret = rpc_xprt_switch_has_addr(xps, sap);
3026         rcu_read_unlock();
3027         return ret;
3028 }
3029 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3030
3031 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
3032 static void rpc_show_header(void)
3033 {
3034         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3035                 "-timeout ---ops--\n");
3036 }
3037
3038 static void rpc_show_task(const struct rpc_clnt *clnt,
3039                           const struct rpc_task *task)
3040 {
3041         const char *rpc_waitq = "none";
3042
3043         if (RPC_IS_QUEUED(task))
3044                 rpc_waitq = rpc_qname(task->tk_waitqueue);
3045
3046         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3047                 task->tk_pid, task->tk_flags, task->tk_status,
3048                 clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3049                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3050                 task->tk_action, rpc_waitq);
3051 }
3052
3053 void rpc_show_tasks(struct net *net)
3054 {
3055         struct rpc_clnt *clnt;
3056         struct rpc_task *task;
3057         int header = 0;
3058         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3059
3060         spin_lock(&sn->rpc_client_lock);
3061         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3062                 spin_lock(&clnt->cl_lock);
3063                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3064                         if (!header) {
3065                                 rpc_show_header();
3066                                 header++;
3067                         }
3068                         rpc_show_task(clnt, task);
3069                 }
3070                 spin_unlock(&clnt->cl_lock);
3071         }
3072         spin_unlock(&sn->rpc_client_lock);
3073 }
3074 #endif
3075
3076 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3077 static int
3078 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3079                 struct rpc_xprt *xprt,
3080                 void *dummy)
3081 {
3082         return xprt_enable_swap(xprt);
3083 }
3084
3085 int
3086 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3087 {
3088         while (clnt != clnt->cl_parent)
3089                 clnt = clnt->cl_parent;
3090         if (atomic_inc_return(&clnt->cl_swapper) == 1)
3091                 return rpc_clnt_iterate_for_each_xprt(clnt,
3092                                 rpc_clnt_swap_activate_callback, NULL);
3093         return 0;
3094 }
3095 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3096
3097 static int
3098 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3099                 struct rpc_xprt *xprt,
3100                 void *dummy)
3101 {
3102         xprt_disable_swap(xprt);
3103         return 0;
3104 }
3105
3106 void
3107 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3108 {
3109         if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3110                 rpc_clnt_iterate_for_each_xprt(clnt,
3111                                 rpc_clnt_swap_deactivate_callback, NULL);
3112 }
3113 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3114 #endif /* CONFIG_SUNRPC_SWAP */