tipc: Remove unused macro TIPC_FWD_MSG
[linux-2.6-microblaze.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2017, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include <linux/sched/signal.h>
39
40 #include "core.h"
41 #include "name_table.h"
42 #include "node.h"
43 #include "link.h"
44 #include "name_distr.h"
45 #include "socket.h"
46 #include "bcast.h"
47 #include "netlink.h"
48 #include "group.h"
49 #include "trace.h"
50
51 #define NAGLE_START_INIT        4
52 #define NAGLE_START_MAX         1024
53 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
54 #define CONN_PROBING_INTV       msecs_to_jiffies(3600000)  /* [ms] => 1 h */
55 #define TIPC_MAX_PORT           0xffffffff
56 #define TIPC_MIN_PORT           1
57 #define TIPC_ACK_RATE           4       /* ACK at 1/4 of of rcv window size */
58
59 enum {
60         TIPC_LISTEN = TCP_LISTEN,
61         TIPC_ESTABLISHED = TCP_ESTABLISHED,
62         TIPC_OPEN = TCP_CLOSE,
63         TIPC_DISCONNECTING = TCP_CLOSE_WAIT,
64         TIPC_CONNECTING = TCP_SYN_SENT,
65 };
66
67 struct sockaddr_pair {
68         struct sockaddr_tipc sock;
69         struct sockaddr_tipc member;
70 };
71
72 /**
73  * struct tipc_sock - TIPC socket structure
74  * @sk: socket - interacts with 'port' and with user via the socket API
75  * @conn_type: TIPC type used when connection was established
76  * @conn_instance: TIPC instance used when connection was established
77  * @published: non-zero if port has one or more associated names
78  * @max_pkt: maximum packet size "hint" used when building messages sent by port
79  * @maxnagle: maximum size of msg which can be subject to nagle
80  * @portid: unique port identity in TIPC socket hash table
81  * @phdr: preformatted message header used when sending messages
82  * #cong_links: list of congested links
83  * @publications: list of publications for port
84  * @blocking_link: address of the congested link we are currently sleeping on
85  * @pub_count: total # of publications port has made during its lifetime
86  * @conn_timeout: the time we can wait for an unresponded setup request
87  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
88  * @cong_link_cnt: number of congested links
89  * @snt_unacked: # messages sent by socket, and not yet acked by peer
90  * @rcv_unacked: # messages read by user, but not yet acked back to peer
91  * @peer: 'connected' peer for dgram/rdm
92  * @node: hash table node
93  * @mc_method: cookie for use between socket and broadcast layer
94  * @rcu: rcu struct for tipc_sock
95  */
96 struct tipc_sock {
97         struct sock sk;
98         u32 conn_type;
99         u32 conn_instance;
100         int published;
101         u32 max_pkt;
102         u32 maxnagle;
103         u32 portid;
104         struct tipc_msg phdr;
105         struct list_head cong_links;
106         struct list_head publications;
107         u32 pub_count;
108         atomic_t dupl_rcvcnt;
109         u16 conn_timeout;
110         bool probe_unacked;
111         u16 cong_link_cnt;
112         u16 snt_unacked;
113         u16 snd_win;
114         u16 peer_caps;
115         u16 rcv_unacked;
116         u16 rcv_win;
117         struct sockaddr_tipc peer;
118         struct rhash_head node;
119         struct tipc_mc_method mc_method;
120         struct rcu_head rcu;
121         struct tipc_group *group;
122         u32 oneway;
123         u32 nagle_start;
124         u16 snd_backlog;
125         u16 msg_acc;
126         u16 pkt_cnt;
127         bool expect_ack;
128         bool nodelay;
129         bool group_is_open;
130 };
131
132 static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
133 static void tipc_data_ready(struct sock *sk);
134 static void tipc_write_space(struct sock *sk);
135 static void tipc_sock_destruct(struct sock *sk);
136 static int tipc_release(struct socket *sock);
137 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
138                        bool kern);
139 static void tipc_sk_timeout(struct timer_list *t);
140 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
141                            struct tipc_name_seq const *seq);
142 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
143                             struct tipc_name_seq const *seq);
144 static int tipc_sk_leave(struct tipc_sock *tsk);
145 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
146 static int tipc_sk_insert(struct tipc_sock *tsk);
147 static void tipc_sk_remove(struct tipc_sock *tsk);
148 static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
149 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
150 static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
151
152 static const struct proto_ops packet_ops;
153 static const struct proto_ops stream_ops;
154 static const struct proto_ops msg_ops;
155 static struct proto tipc_proto;
156 static const struct rhashtable_params tsk_rht_params;
157
158 static u32 tsk_own_node(struct tipc_sock *tsk)
159 {
160         return msg_prevnode(&tsk->phdr);
161 }
162
163 static u32 tsk_peer_node(struct tipc_sock *tsk)
164 {
165         return msg_destnode(&tsk->phdr);
166 }
167
168 static u32 tsk_peer_port(struct tipc_sock *tsk)
169 {
170         return msg_destport(&tsk->phdr);
171 }
172
173 static  bool tsk_unreliable(struct tipc_sock *tsk)
174 {
175         return msg_src_droppable(&tsk->phdr) != 0;
176 }
177
178 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
179 {
180         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
181 }
182
183 static bool tsk_unreturnable(struct tipc_sock *tsk)
184 {
185         return msg_dest_droppable(&tsk->phdr) != 0;
186 }
187
188 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
189 {
190         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
191 }
192
193 static int tsk_importance(struct tipc_sock *tsk)
194 {
195         return msg_importance(&tsk->phdr);
196 }
197
198 static struct tipc_sock *tipc_sk(const struct sock *sk)
199 {
200         return container_of(sk, struct tipc_sock, sk);
201 }
202
203 int tsk_set_importance(struct sock *sk, int imp)
204 {
205         if (imp > TIPC_CRITICAL_IMPORTANCE)
206                 return -EINVAL;
207         msg_set_importance(&tipc_sk(sk)->phdr, (u32)imp);
208         return 0;
209 }
210
211 static bool tsk_conn_cong(struct tipc_sock *tsk)
212 {
213         return tsk->snt_unacked > tsk->snd_win;
214 }
215
216 static u16 tsk_blocks(int len)
217 {
218         return ((len / FLOWCTL_BLK_SZ) + 1);
219 }
220
221 /* tsk_blocks(): translate a buffer size in bytes to number of
222  * advertisable blocks, taking into account the ratio truesize(len)/len
223  * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
224  */
225 static u16 tsk_adv_blocks(int len)
226 {
227         return len / FLOWCTL_BLK_SZ / 4;
228 }
229
230 /* tsk_inc(): increment counter for sent or received data
231  * - If block based flow control is not supported by peer we
232  *   fall back to message based ditto, incrementing the counter
233  */
234 static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
235 {
236         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
237                 return ((msglen / FLOWCTL_BLK_SZ) + 1);
238         return 1;
239 }
240
241 /* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
242  */
243 static void tsk_set_nagle(struct tipc_sock *tsk)
244 {
245         struct sock *sk = &tsk->sk;
246
247         tsk->maxnagle = 0;
248         if (sk->sk_type != SOCK_STREAM)
249                 return;
250         if (tsk->nodelay)
251                 return;
252         if (!(tsk->peer_caps & TIPC_NAGLE))
253                 return;
254         /* Limit node local buffer size to avoid receive queue overflow */
255         if (tsk->max_pkt == MAX_MSG_SIZE)
256                 tsk->maxnagle = 1500;
257         else
258                 tsk->maxnagle = tsk->max_pkt;
259 }
260
261 /**
262  * tsk_advance_rx_queue - discard first buffer in socket receive queue
263  *
264  * Caller must hold socket lock
265  */
266 static void tsk_advance_rx_queue(struct sock *sk)
267 {
268         trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
269         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
270 }
271
272 /* tipc_sk_respond() : send response message back to sender
273  */
274 static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
275 {
276         u32 selector;
277         u32 dnode;
278         u32 onode = tipc_own_addr(sock_net(sk));
279
280         if (!tipc_msg_reverse(onode, &skb, err))
281                 return;
282
283         trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
284         dnode = msg_destnode(buf_msg(skb));
285         selector = msg_origport(buf_msg(skb));
286         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
287 }
288
289 /**
290  * tsk_rej_rx_queue - reject all buffers in socket receive queue
291  *
292  * Caller must hold socket lock
293  */
294 static void tsk_rej_rx_queue(struct sock *sk, int error)
295 {
296         struct sk_buff *skb;
297
298         while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
299                 tipc_sk_respond(sk, skb, error);
300 }
301
302 static bool tipc_sk_connected(struct sock *sk)
303 {
304         return sk->sk_state == TIPC_ESTABLISHED;
305 }
306
307 /* tipc_sk_type_connectionless - check if the socket is datagram socket
308  * @sk: socket
309  *
310  * Returns true if connection less, false otherwise
311  */
312 static bool tipc_sk_type_connectionless(struct sock *sk)
313 {
314         return sk->sk_type == SOCK_RDM || sk->sk_type == SOCK_DGRAM;
315 }
316
317 /* tsk_peer_msg - verify if message was sent by connected port's peer
318  *
319  * Handles cases where the node's network address has changed from
320  * the default of <0.0.0> to its configured setting.
321  */
322 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
323 {
324         struct sock *sk = &tsk->sk;
325         u32 self = tipc_own_addr(sock_net(sk));
326         u32 peer_port = tsk_peer_port(tsk);
327         u32 orig_node, peer_node;
328
329         if (unlikely(!tipc_sk_connected(sk)))
330                 return false;
331
332         if (unlikely(msg_origport(msg) != peer_port))
333                 return false;
334
335         orig_node = msg_orignode(msg);
336         peer_node = tsk_peer_node(tsk);
337
338         if (likely(orig_node == peer_node))
339                 return true;
340
341         if (!orig_node && peer_node == self)
342                 return true;
343
344         if (!peer_node && orig_node == self)
345                 return true;
346
347         return false;
348 }
349
350 /* tipc_set_sk_state - set the sk_state of the socket
351  * @sk: socket
352  *
353  * Caller must hold socket lock
354  *
355  * Returns 0 on success, errno otherwise
356  */
357 static int tipc_set_sk_state(struct sock *sk, int state)
358 {
359         int oldsk_state = sk->sk_state;
360         int res = -EINVAL;
361
362         switch (state) {
363         case TIPC_OPEN:
364                 res = 0;
365                 break;
366         case TIPC_LISTEN:
367         case TIPC_CONNECTING:
368                 if (oldsk_state == TIPC_OPEN)
369                         res = 0;
370                 break;
371         case TIPC_ESTABLISHED:
372                 if (oldsk_state == TIPC_CONNECTING ||
373                     oldsk_state == TIPC_OPEN)
374                         res = 0;
375                 break;
376         case TIPC_DISCONNECTING:
377                 if (oldsk_state == TIPC_CONNECTING ||
378                     oldsk_state == TIPC_ESTABLISHED)
379                         res = 0;
380                 break;
381         }
382
383         if (!res)
384                 sk->sk_state = state;
385
386         return res;
387 }
388
389 static int tipc_sk_sock_err(struct socket *sock, long *timeout)
390 {
391         struct sock *sk = sock->sk;
392         int err = sock_error(sk);
393         int typ = sock->type;
394
395         if (err)
396                 return err;
397         if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
398                 if (sk->sk_state == TIPC_DISCONNECTING)
399                         return -EPIPE;
400                 else if (!tipc_sk_connected(sk))
401                         return -ENOTCONN;
402         }
403         if (!*timeout)
404                 return -EAGAIN;
405         if (signal_pending(current))
406                 return sock_intr_errno(*timeout);
407
408         return 0;
409 }
410
411 #define tipc_wait_for_cond(sock_, timeo_, condition_)                          \
412 ({                                                                             \
413         DEFINE_WAIT_FUNC(wait_, woken_wake_function);                          \
414         struct sock *sk_;                                                      \
415         int rc_;                                                               \
416                                                                                \
417         while ((rc_ = !(condition_))) {                                        \
418                 /* coupled with smp_wmb() in tipc_sk_proto_rcv() */            \
419                 smp_rmb();                                                     \
420                 sk_ = (sock_)->sk;                                             \
421                 rc_ = tipc_sk_sock_err((sock_), timeo_);                       \
422                 if (rc_)                                                       \
423                         break;                                                 \
424                 add_wait_queue(sk_sleep(sk_), &wait_);                         \
425                 release_sock(sk_);                                             \
426                 *(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
427                 sched_annotate_sleep();                                        \
428                 lock_sock(sk_);                                                \
429                 remove_wait_queue(sk_sleep(sk_), &wait_);                      \
430         }                                                                      \
431         rc_;                                                                   \
432 })
433
434 /**
435  * tipc_sk_create - create a TIPC socket
436  * @net: network namespace (must be default network)
437  * @sock: pre-allocated socket structure
438  * @protocol: protocol indicator (must be 0)
439  * @kern: caused by kernel or by userspace?
440  *
441  * This routine creates additional data structures used by the TIPC socket,
442  * initializes them, and links them together.
443  *
444  * Returns 0 on success, errno otherwise
445  */
446 static int tipc_sk_create(struct net *net, struct socket *sock,
447                           int protocol, int kern)
448 {
449         const struct proto_ops *ops;
450         struct sock *sk;
451         struct tipc_sock *tsk;
452         struct tipc_msg *msg;
453
454         /* Validate arguments */
455         if (unlikely(protocol != 0))
456                 return -EPROTONOSUPPORT;
457
458         switch (sock->type) {
459         case SOCK_STREAM:
460                 ops = &stream_ops;
461                 break;
462         case SOCK_SEQPACKET:
463                 ops = &packet_ops;
464                 break;
465         case SOCK_DGRAM:
466         case SOCK_RDM:
467                 ops = &msg_ops;
468                 break;
469         default:
470                 return -EPROTOTYPE;
471         }
472
473         /* Allocate socket's protocol area */
474         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
475         if (sk == NULL)
476                 return -ENOMEM;
477
478         tsk = tipc_sk(sk);
479         tsk->max_pkt = MAX_PKT_DEFAULT;
480         tsk->maxnagle = 0;
481         tsk->nagle_start = NAGLE_START_INIT;
482         INIT_LIST_HEAD(&tsk->publications);
483         INIT_LIST_HEAD(&tsk->cong_links);
484         msg = &tsk->phdr;
485
486         /* Finish initializing socket data structures */
487         sock->ops = ops;
488         sock_init_data(sock, sk);
489         tipc_set_sk_state(sk, TIPC_OPEN);
490         if (tipc_sk_insert(tsk)) {
491                 pr_warn("Socket create failed; port number exhausted\n");
492                 return -EINVAL;
493         }
494
495         /* Ensure tsk is visible before we read own_addr. */
496         smp_mb();
497
498         tipc_msg_init(tipc_own_addr(net), msg, TIPC_LOW_IMPORTANCE,
499                       TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
500
501         msg_set_origport(msg, tsk->portid);
502         timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
503         sk->sk_shutdown = 0;
504         sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
505         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
506         sk->sk_data_ready = tipc_data_ready;
507         sk->sk_write_space = tipc_write_space;
508         sk->sk_destruct = tipc_sock_destruct;
509         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
510         tsk->group_is_open = true;
511         atomic_set(&tsk->dupl_rcvcnt, 0);
512
513         /* Start out with safe limits until we receive an advertised window */
514         tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
515         tsk->rcv_win = tsk->snd_win;
516
517         if (tipc_sk_type_connectionless(sk)) {
518                 tsk_set_unreturnable(tsk, true);
519                 if (sock->type == SOCK_DGRAM)
520                         tsk_set_unreliable(tsk, true);
521         }
522         __skb_queue_head_init(&tsk->mc_method.deferredq);
523         trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
524         return 0;
525 }
526
527 static void tipc_sk_callback(struct rcu_head *head)
528 {
529         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
530
531         sock_put(&tsk->sk);
532 }
533
534 /* Caller should hold socket lock for the socket. */
535 static void __tipc_shutdown(struct socket *sock, int error)
536 {
537         struct sock *sk = sock->sk;
538         struct tipc_sock *tsk = tipc_sk(sk);
539         struct net *net = sock_net(sk);
540         long timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
541         u32 dnode = tsk_peer_node(tsk);
542         struct sk_buff *skb;
543
544         /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
545         tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
546                                             !tsk_conn_cong(tsk)));
547
548         /* Push out delayed messages if in Nagle mode */
549         tipc_sk_push_backlog(tsk, false);
550         /* Remove pending SYN */
551         __skb_queue_purge(&sk->sk_write_queue);
552
553         /* Remove partially received buffer if any */
554         skb = skb_peek(&sk->sk_receive_queue);
555         if (skb && TIPC_SKB_CB(skb)->bytes_read) {
556                 __skb_unlink(skb, &sk->sk_receive_queue);
557                 kfree_skb(skb);
558         }
559
560         /* Reject all unreceived messages if connectionless */
561         if (tipc_sk_type_connectionless(sk)) {
562                 tsk_rej_rx_queue(sk, error);
563                 return;
564         }
565
566         switch (sk->sk_state) {
567         case TIPC_CONNECTING:
568         case TIPC_ESTABLISHED:
569                 tipc_set_sk_state(sk, TIPC_DISCONNECTING);
570                 tipc_node_remove_conn(net, dnode, tsk->portid);
571                 /* Send a FIN+/- to its peer */
572                 skb = __skb_dequeue(&sk->sk_receive_queue);
573                 if (skb) {
574                         __skb_queue_purge(&sk->sk_receive_queue);
575                         tipc_sk_respond(sk, skb, error);
576                         break;
577                 }
578                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
579                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
580                                       tsk_own_node(tsk), tsk_peer_port(tsk),
581                                       tsk->portid, error);
582                 if (skb)
583                         tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
584                 break;
585         case TIPC_LISTEN:
586                 /* Reject all SYN messages */
587                 tsk_rej_rx_queue(sk, error);
588                 break;
589         default:
590                 __skb_queue_purge(&sk->sk_receive_queue);
591                 break;
592         }
593 }
594
595 /**
596  * tipc_release - destroy a TIPC socket
597  * @sock: socket to destroy
598  *
599  * This routine cleans up any messages that are still queued on the socket.
600  * For DGRAM and RDM socket types, all queued messages are rejected.
601  * For SEQPACKET and STREAM socket types, the first message is rejected
602  * and any others are discarded.  (If the first message on a STREAM socket
603  * is partially-read, it is discarded and the next one is rejected instead.)
604  *
605  * NOTE: Rejected messages are not necessarily returned to the sender!  They
606  * are returned or discarded according to the "destination droppable" setting
607  * specified for the message by the sender.
608  *
609  * Returns 0 on success, errno otherwise
610  */
611 static int tipc_release(struct socket *sock)
612 {
613         struct sock *sk = sock->sk;
614         struct tipc_sock *tsk;
615
616         /*
617          * Exit if socket isn't fully initialized (occurs when a failed accept()
618          * releases a pre-allocated child socket that was never used)
619          */
620         if (sk == NULL)
621                 return 0;
622
623         tsk = tipc_sk(sk);
624         lock_sock(sk);
625
626         trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
627         __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
628         sk->sk_shutdown = SHUTDOWN_MASK;
629         tipc_sk_leave(tsk);
630         tipc_sk_withdraw(tsk, 0, NULL);
631         __skb_queue_purge(&tsk->mc_method.deferredq);
632         sk_stop_timer(sk, &sk->sk_timer);
633         tipc_sk_remove(tsk);
634
635         sock_orphan(sk);
636         /* Reject any messages that accumulated in backlog queue */
637         release_sock(sk);
638         tipc_dest_list_purge(&tsk->cong_links);
639         tsk->cong_link_cnt = 0;
640         call_rcu(&tsk->rcu, tipc_sk_callback);
641         sock->sk = NULL;
642
643         return 0;
644 }
645
646 /**
647  * tipc_bind - associate or disassocate TIPC name(s) with a socket
648  * @sock: socket structure
649  * @uaddr: socket address describing name(s) and desired operation
650  * @uaddr_len: size of socket address data structure
651  *
652  * Name and name sequence binding is indicated using a positive scope value;
653  * a negative scope value unbinds the specified name.  Specifying no name
654  * (i.e. a socket address length of 0) unbinds all names from the socket.
655  *
656  * Returns 0 on success, errno otherwise
657  *
658  * NOTE: This routine doesn't need to take the socket lock since it doesn't
659  *       access any non-constant socket information.
660  */
661 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
662                      int uaddr_len)
663 {
664         struct sock *sk = sock->sk;
665         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
666         struct tipc_sock *tsk = tipc_sk(sk);
667         int res = -EINVAL;
668
669         lock_sock(sk);
670         if (unlikely(!uaddr_len)) {
671                 res = tipc_sk_withdraw(tsk, 0, NULL);
672                 goto exit;
673         }
674         if (tsk->group) {
675                 res = -EACCES;
676                 goto exit;
677         }
678         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
679                 res = -EINVAL;
680                 goto exit;
681         }
682         if (addr->family != AF_TIPC) {
683                 res = -EAFNOSUPPORT;
684                 goto exit;
685         }
686
687         if (addr->addrtype == TIPC_ADDR_NAME)
688                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
689         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
690                 res = -EAFNOSUPPORT;
691                 goto exit;
692         }
693
694         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
695             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
696             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
697                 res = -EACCES;
698                 goto exit;
699         }
700
701         res = (addr->scope >= 0) ?
702                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
703                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
704 exit:
705         release_sock(sk);
706         return res;
707 }
708
709 /**
710  * tipc_getname - get port ID of socket or peer socket
711  * @sock: socket structure
712  * @uaddr: area for returned socket address
713  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
714  *
715  * Returns 0 on success, errno otherwise
716  *
717  * NOTE: This routine doesn't need to take the socket lock since it only
718  *       accesses socket information that is unchanging (or which changes in
719  *       a completely predictable manner).
720  */
721 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
722                         int peer)
723 {
724         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
725         struct sock *sk = sock->sk;
726         struct tipc_sock *tsk = tipc_sk(sk);
727
728         memset(addr, 0, sizeof(*addr));
729         if (peer) {
730                 if ((!tipc_sk_connected(sk)) &&
731                     ((peer != 2) || (sk->sk_state != TIPC_DISCONNECTING)))
732                         return -ENOTCONN;
733                 addr->addr.id.ref = tsk_peer_port(tsk);
734                 addr->addr.id.node = tsk_peer_node(tsk);
735         } else {
736                 addr->addr.id.ref = tsk->portid;
737                 addr->addr.id.node = tipc_own_addr(sock_net(sk));
738         }
739
740         addr->addrtype = TIPC_ADDR_ID;
741         addr->family = AF_TIPC;
742         addr->scope = 0;
743         addr->addr.name.domain = 0;
744
745         return sizeof(*addr);
746 }
747
748 /**
749  * tipc_poll - read and possibly block on pollmask
750  * @file: file structure associated with the socket
751  * @sock: socket for which to calculate the poll bits
752  * @wait: ???
753  *
754  * Returns pollmask value
755  *
756  * COMMENTARY:
757  * It appears that the usual socket locking mechanisms are not useful here
758  * since the pollmask info is potentially out-of-date the moment this routine
759  * exits.  TCP and other protocols seem to rely on higher level poll routines
760  * to handle any preventable race conditions, so TIPC will do the same ...
761  *
762  * IMPORTANT: The fact that a read or write operation is indicated does NOT
763  * imply that the operation will succeed, merely that it should be performed
764  * and will not block.
765  */
766 static __poll_t tipc_poll(struct file *file, struct socket *sock,
767                               poll_table *wait)
768 {
769         struct sock *sk = sock->sk;
770         struct tipc_sock *tsk = tipc_sk(sk);
771         __poll_t revents = 0;
772
773         sock_poll_wait(file, sock, wait);
774         trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
775
776         if (sk->sk_shutdown & RCV_SHUTDOWN)
777                 revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
778         if (sk->sk_shutdown == SHUTDOWN_MASK)
779                 revents |= EPOLLHUP;
780
781         switch (sk->sk_state) {
782         case TIPC_ESTABLISHED:
783                 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
784                         revents |= EPOLLOUT;
785                 fallthrough;
786         case TIPC_LISTEN:
787         case TIPC_CONNECTING:
788                 if (!skb_queue_empty_lockless(&sk->sk_receive_queue))
789                         revents |= EPOLLIN | EPOLLRDNORM;
790                 break;
791         case TIPC_OPEN:
792                 if (tsk->group_is_open && !tsk->cong_link_cnt)
793                         revents |= EPOLLOUT;
794                 if (!tipc_sk_type_connectionless(sk))
795                         break;
796                 if (skb_queue_empty_lockless(&sk->sk_receive_queue))
797                         break;
798                 revents |= EPOLLIN | EPOLLRDNORM;
799                 break;
800         case TIPC_DISCONNECTING:
801                 revents = EPOLLIN | EPOLLRDNORM | EPOLLHUP;
802                 break;
803         }
804         return revents;
805 }
806
807 /**
808  * tipc_sendmcast - send multicast message
809  * @sock: socket structure
810  * @seq: destination address
811  * @msg: message to send
812  * @dlen: length of data to send
813  * @timeout: timeout to wait for wakeup
814  *
815  * Called from function tipc_sendmsg(), which has done all sanity checks
816  * Returns the number of bytes sent on success, or errno
817  */
818 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
819                           struct msghdr *msg, size_t dlen, long timeout)
820 {
821         struct sock *sk = sock->sk;
822         struct tipc_sock *tsk = tipc_sk(sk);
823         struct tipc_msg *hdr = &tsk->phdr;
824         struct net *net = sock_net(sk);
825         int mtu = tipc_bcast_get_mtu(net);
826         struct tipc_mc_method *method = &tsk->mc_method;
827         struct sk_buff_head pkts;
828         struct tipc_nlist dsts;
829         int rc;
830
831         if (tsk->group)
832                 return -EACCES;
833
834         /* Block or return if any destination link is congested */
835         rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
836         if (unlikely(rc))
837                 return rc;
838
839         /* Lookup destination nodes */
840         tipc_nlist_init(&dsts, tipc_own_addr(net));
841         tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
842                                       seq->upper, &dsts);
843         if (!dsts.local && !dsts.remote)
844                 return -EHOSTUNREACH;
845
846         /* Build message header */
847         msg_set_type(hdr, TIPC_MCAST_MSG);
848         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
849         msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
850         msg_set_destport(hdr, 0);
851         msg_set_destnode(hdr, 0);
852         msg_set_nametype(hdr, seq->type);
853         msg_set_namelower(hdr, seq->lower);
854         msg_set_nameupper(hdr, seq->upper);
855
856         /* Build message as chain of buffers */
857         __skb_queue_head_init(&pkts);
858         rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
859
860         /* Send message if build was successful */
861         if (unlikely(rc == dlen)) {
862                 trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
863                                         TIPC_DUMP_SK_SNDQ, " ");
864                 rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
865                                      &tsk->cong_link_cnt);
866         }
867
868         tipc_nlist_purge(&dsts);
869
870         return rc ? rc : dlen;
871 }
872
873 /**
874  * tipc_send_group_msg - send a message to a member in the group
875  * @net: network namespace
876  * @m: message to send
877  * @mb: group member
878  * @dnode: destination node
879  * @dport: destination port
880  * @dlen: total length of message data
881  */
882 static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
883                                struct msghdr *m, struct tipc_member *mb,
884                                u32 dnode, u32 dport, int dlen)
885 {
886         u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
887         struct tipc_mc_method *method = &tsk->mc_method;
888         int blks = tsk_blocks(GROUP_H_SIZE + dlen);
889         struct tipc_msg *hdr = &tsk->phdr;
890         struct sk_buff_head pkts;
891         int mtu, rc;
892
893         /* Complete message header */
894         msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
895         msg_set_hdr_sz(hdr, GROUP_H_SIZE);
896         msg_set_destport(hdr, dport);
897         msg_set_destnode(hdr, dnode);
898         msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
899
900         /* Build message as chain of buffers */
901         __skb_queue_head_init(&pkts);
902         mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
903         rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
904         if (unlikely(rc != dlen))
905                 return rc;
906
907         /* Send message */
908         rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
909         if (unlikely(rc == -ELINKCONG)) {
910                 tipc_dest_push(&tsk->cong_links, dnode, 0);
911                 tsk->cong_link_cnt++;
912         }
913
914         /* Update send window */
915         tipc_group_update_member(mb, blks);
916
917         /* A broadcast sent within next EXPIRE period must follow same path */
918         method->rcast = true;
919         method->mandatory = true;
920         return dlen;
921 }
922
923 /**
924  * tipc_send_group_unicast - send message to a member in the group
925  * @sock: socket structure
926  * @m: message to send
927  * @dlen: total length of message data
928  * @timeout: timeout to wait for wakeup
929  *
930  * Called from function tipc_sendmsg(), which has done all sanity checks
931  * Returns the number of bytes sent on success, or errno
932  */
933 static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
934                                    int dlen, long timeout)
935 {
936         struct sock *sk = sock->sk;
937         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
938         int blks = tsk_blocks(GROUP_H_SIZE + dlen);
939         struct tipc_sock *tsk = tipc_sk(sk);
940         struct net *net = sock_net(sk);
941         struct tipc_member *mb = NULL;
942         u32 node, port;
943         int rc;
944
945         node = dest->addr.id.node;
946         port = dest->addr.id.ref;
947         if (!port && !node)
948                 return -EHOSTUNREACH;
949
950         /* Block or return if destination link or member is congested */
951         rc = tipc_wait_for_cond(sock, &timeout,
952                                 !tipc_dest_find(&tsk->cong_links, node, 0) &&
953                                 tsk->group &&
954                                 !tipc_group_cong(tsk->group, node, port, blks,
955                                                  &mb));
956         if (unlikely(rc))
957                 return rc;
958
959         if (unlikely(!mb))
960                 return -EHOSTUNREACH;
961
962         rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
963
964         return rc ? rc : dlen;
965 }
966
967 /**
968  * tipc_send_group_anycast - send message to any member with given identity
969  * @sock: socket structure
970  * @m: message to send
971  * @dlen: total length of message data
972  * @timeout: timeout to wait for wakeup
973  *
974  * Called from function tipc_sendmsg(), which has done all sanity checks
975  * Returns the number of bytes sent on success, or errno
976  */
977 static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
978                                    int dlen, long timeout)
979 {
980         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
981         struct sock *sk = sock->sk;
982         struct tipc_sock *tsk = tipc_sk(sk);
983         struct list_head *cong_links = &tsk->cong_links;
984         int blks = tsk_blocks(GROUP_H_SIZE + dlen);
985         struct tipc_msg *hdr = &tsk->phdr;
986         struct tipc_member *first = NULL;
987         struct tipc_member *mbr = NULL;
988         struct net *net = sock_net(sk);
989         u32 node, port, exclude;
990         struct list_head dsts;
991         u32 type, inst, scope;
992         int lookups = 0;
993         int dstcnt, rc;
994         bool cong;
995
996         INIT_LIST_HEAD(&dsts);
997
998         type = msg_nametype(hdr);
999         inst = dest->addr.name.name.instance;
1000         scope = msg_lookup_scope(hdr);
1001
1002         while (++lookups < 4) {
1003                 exclude = tipc_group_exclude(tsk->group);
1004
1005                 first = NULL;
1006
1007                 /* Look for a non-congested destination member, if any */
1008                 while (1) {
1009                         if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
1010                                                  &dstcnt, exclude, false))
1011                                 return -EHOSTUNREACH;
1012                         tipc_dest_pop(&dsts, &node, &port);
1013                         cong = tipc_group_cong(tsk->group, node, port, blks,
1014                                                &mbr);
1015                         if (!cong)
1016                                 break;
1017                         if (mbr == first)
1018                                 break;
1019                         if (!first)
1020                                 first = mbr;
1021                 }
1022
1023                 /* Start over if destination was not in member list */
1024                 if (unlikely(!mbr))
1025                         continue;
1026
1027                 if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
1028                         break;
1029
1030                 /* Block or return if destination link or member is congested */
1031                 rc = tipc_wait_for_cond(sock, &timeout,
1032                                         !tipc_dest_find(cong_links, node, 0) &&
1033                                         tsk->group &&
1034                                         !tipc_group_cong(tsk->group, node, port,
1035                                                          blks, &mbr));
1036                 if (unlikely(rc))
1037                         return rc;
1038
1039                 /* Send, unless destination disappeared while waiting */
1040                 if (likely(mbr))
1041                         break;
1042         }
1043
1044         if (unlikely(lookups >= 4))
1045                 return -EHOSTUNREACH;
1046
1047         rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
1048
1049         return rc ? rc : dlen;
1050 }
1051
1052 /**
1053  * tipc_send_group_bcast - send message to all members in communication group
1054  * @sock: socket structure
1055  * @m: message to send
1056  * @dlen: total length of message data
1057  * @timeout: timeout to wait for wakeup
1058  *
1059  * Called from function tipc_sendmsg(), which has done all sanity checks
1060  * Returns the number of bytes sent on success, or errno
1061  */
1062 static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1063                                  int dlen, long timeout)
1064 {
1065         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1066         struct sock *sk = sock->sk;
1067         struct net *net = sock_net(sk);
1068         struct tipc_sock *tsk = tipc_sk(sk);
1069         struct tipc_nlist *dsts;
1070         struct tipc_mc_method *method = &tsk->mc_method;
1071         bool ack = method->mandatory && method->rcast;
1072         int blks = tsk_blocks(MCAST_H_SIZE + dlen);
1073         struct tipc_msg *hdr = &tsk->phdr;
1074         int mtu = tipc_bcast_get_mtu(net);
1075         struct sk_buff_head pkts;
1076         int rc = -EHOSTUNREACH;
1077
1078         /* Block or return if any destination link or member is congested */
1079         rc = tipc_wait_for_cond(sock, &timeout,
1080                                 !tsk->cong_link_cnt && tsk->group &&
1081                                 !tipc_group_bc_cong(tsk->group, blks));
1082         if (unlikely(rc))
1083                 return rc;
1084
1085         dsts = tipc_group_dests(tsk->group);
1086         if (!dsts->local && !dsts->remote)
1087                 return -EHOSTUNREACH;
1088
1089         /* Complete message header */
1090         if (dest) {
1091                 msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
1092                 msg_set_nameinst(hdr, dest->addr.name.name.instance);
1093         } else {
1094                 msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
1095                 msg_set_nameinst(hdr, 0);
1096         }
1097         msg_set_hdr_sz(hdr, GROUP_H_SIZE);
1098         msg_set_destport(hdr, 0);
1099         msg_set_destnode(hdr, 0);
1100         msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(tsk->group));
1101
1102         /* Avoid getting stuck with repeated forced replicasts */
1103         msg_set_grp_bc_ack_req(hdr, ack);
1104
1105         /* Build message as chain of buffers */
1106         __skb_queue_head_init(&pkts);
1107         rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1108         if (unlikely(rc != dlen))
1109                 return rc;
1110
1111         /* Send message */
1112         rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
1113         if (unlikely(rc))
1114                 return rc;
1115
1116         /* Update broadcast sequence number and send windows */
1117         tipc_group_update_bc_members(tsk->group, blks, ack);
1118
1119         /* Broadcast link is now free to choose method for next broadcast */
1120         method->mandatory = false;
1121         method->expires = jiffies;
1122
1123         return dlen;
1124 }
1125
1126 /**
1127  * tipc_send_group_mcast - send message to all members with given identity
1128  * @sock: socket structure
1129  * @m: message to send
1130  * @dlen: total length of message data
1131  * @timeout: timeout to wait for wakeup
1132  *
1133  * Called from function tipc_sendmsg(), which has done all sanity checks
1134  * Returns the number of bytes sent on success, or errno
1135  */
1136 static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1137                                  int dlen, long timeout)
1138 {
1139         struct sock *sk = sock->sk;
1140         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1141         struct tipc_sock *tsk = tipc_sk(sk);
1142         struct tipc_group *grp = tsk->group;
1143         struct tipc_msg *hdr = &tsk->phdr;
1144         struct net *net = sock_net(sk);
1145         u32 type, inst, scope, exclude;
1146         struct list_head dsts;
1147         u32 dstcnt;
1148
1149         INIT_LIST_HEAD(&dsts);
1150
1151         type = msg_nametype(hdr);
1152         inst = dest->addr.name.name.instance;
1153         scope = msg_lookup_scope(hdr);
1154         exclude = tipc_group_exclude(grp);
1155
1156         if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
1157                                  &dstcnt, exclude, true))
1158                 return -EHOSTUNREACH;
1159
1160         if (dstcnt == 1) {
1161                 tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref);
1162                 return tipc_send_group_unicast(sock, m, dlen, timeout);
1163         }
1164
1165         tipc_dest_list_purge(&dsts);
1166         return tipc_send_group_bcast(sock, m, dlen, timeout);
1167 }
1168
1169 /**
1170  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
1171  * @arrvq: queue with arriving messages, to be cloned after destination lookup
1172  * @inputq: queue with cloned messages, delivered to socket after dest lookup
1173  *
1174  * Multi-threaded: parallel calls with reference to same queues may occur
1175  */
1176 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1177                        struct sk_buff_head *inputq)
1178 {
1179         u32 self = tipc_own_addr(net);
1180         u32 type, lower, upper, scope;
1181         struct sk_buff *skb, *_skb;
1182         u32 portid, onode;
1183         struct sk_buff_head tmpq;
1184         struct list_head dports;
1185         struct tipc_msg *hdr;
1186         int user, mtyp, hlen;
1187         bool exact;
1188
1189         __skb_queue_head_init(&tmpq);
1190         INIT_LIST_HEAD(&dports);
1191
1192         skb = tipc_skb_peek(arrvq, &inputq->lock);
1193         for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
1194                 hdr = buf_msg(skb);
1195                 user = msg_user(hdr);
1196                 mtyp = msg_type(hdr);
1197                 hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
1198                 onode = msg_orignode(hdr);
1199                 type = msg_nametype(hdr);
1200
1201                 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1202                         spin_lock_bh(&inputq->lock);
1203                         if (skb_peek(arrvq) == skb) {
1204                                 __skb_dequeue(arrvq);
1205                                 __skb_queue_tail(inputq, skb);
1206                         }
1207                         kfree_skb(skb);
1208                         spin_unlock_bh(&inputq->lock);
1209                         continue;
1210                 }
1211
1212                 /* Group messages require exact scope match */
1213                 if (msg_in_group(hdr)) {
1214                         lower = 0;
1215                         upper = ~0;
1216                         scope = msg_lookup_scope(hdr);
1217                         exact = true;
1218                 } else {
1219                         /* TIPC_NODE_SCOPE means "any scope" in this context */
1220                         if (onode == self)
1221                                 scope = TIPC_NODE_SCOPE;
1222                         else
1223                                 scope = TIPC_CLUSTER_SCOPE;
1224                         exact = false;
1225                         lower = msg_namelower(hdr);
1226                         upper = msg_nameupper(hdr);
1227                 }
1228
1229                 /* Create destination port list: */
1230                 tipc_nametbl_mc_lookup(net, type, lower, upper,
1231                                        scope, exact, &dports);
1232
1233                 /* Clone message per destination */
1234                 while (tipc_dest_pop(&dports, NULL, &portid)) {
1235                         _skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
1236                         if (_skb) {
1237                                 msg_set_destport(buf_msg(_skb), portid);
1238                                 __skb_queue_tail(&tmpq, _skb);
1239                                 continue;
1240                         }
1241                         pr_warn("Failed to clone mcast rcv buffer\n");
1242                 }
1243                 /* Append to inputq if not already done by other thread */
1244                 spin_lock_bh(&inputq->lock);
1245                 if (skb_peek(arrvq) == skb) {
1246                         skb_queue_splice_tail_init(&tmpq, inputq);
1247                         kfree_skb(__skb_dequeue(arrvq));
1248                 }
1249                 spin_unlock_bh(&inputq->lock);
1250                 __skb_queue_purge(&tmpq);
1251                 kfree_skb(skb);
1252         }
1253         tipc_sk_rcv(net, inputq);
1254 }
1255
1256 /* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
1257  *                         when socket is in Nagle mode
1258  */
1259 static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
1260 {
1261         struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
1262         struct sk_buff *skb = skb_peek_tail(txq);
1263         struct net *net = sock_net(&tsk->sk);
1264         u32 dnode = tsk_peer_node(tsk);
1265         int rc;
1266
1267         if (nagle_ack) {
1268                 tsk->pkt_cnt += skb_queue_len(txq);
1269                 if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
1270                         tsk->oneway = 0;
1271                         if (tsk->nagle_start < NAGLE_START_MAX)
1272                                 tsk->nagle_start *= 2;
1273                         tsk->expect_ack = false;
1274                         pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
1275                                  tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
1276                                  tsk->nagle_start);
1277                 } else {
1278                         tsk->nagle_start = NAGLE_START_INIT;
1279                         if (skb) {
1280                                 msg_set_ack_required(buf_msg(skb));
1281                                 tsk->expect_ack = true;
1282                         } else {
1283                                 tsk->expect_ack = false;
1284                         }
1285                 }
1286                 tsk->msg_acc = 0;
1287                 tsk->pkt_cnt = 0;
1288         }
1289
1290         if (!skb || tsk->cong_link_cnt)
1291                 return;
1292
1293         /* Do not send SYN again after congestion */
1294         if (msg_is_syn(buf_msg(skb)))
1295                 return;
1296
1297         if (tsk->msg_acc)
1298                 tsk->pkt_cnt += skb_queue_len(txq);
1299         tsk->snt_unacked += tsk->snd_backlog;
1300         tsk->snd_backlog = 0;
1301         rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
1302         if (rc == -ELINKCONG)
1303                 tsk->cong_link_cnt = 1;
1304 }
1305
1306 /**
1307  * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
1308  * @tsk: receiving socket
1309  * @skb: pointer to message buffer.
1310  */
1311 static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
1312                                    struct sk_buff_head *inputq,
1313                                    struct sk_buff_head *xmitq)
1314 {
1315         struct tipc_msg *hdr = buf_msg(skb);
1316         u32 onode = tsk_own_node(tsk);
1317         struct sock *sk = &tsk->sk;
1318         int mtyp = msg_type(hdr);
1319         bool was_cong;
1320
1321         /* Ignore if connection cannot be validated: */
1322         if (!tsk_peer_msg(tsk, hdr)) {
1323                 trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
1324                 goto exit;
1325         }
1326
1327         if (unlikely(msg_errcode(hdr))) {
1328                 tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1329                 tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
1330                                       tsk_peer_port(tsk));
1331                 sk->sk_state_change(sk);
1332
1333                 /* State change is ignored if socket already awake,
1334                  * - convert msg to abort msg and add to inqueue
1335                  */
1336                 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
1337                 msg_set_type(hdr, TIPC_CONN_MSG);
1338                 msg_set_size(hdr, BASIC_H_SIZE);
1339                 msg_set_hdr_sz(hdr, BASIC_H_SIZE);
1340                 __skb_queue_tail(inputq, skb);
1341                 return;
1342         }
1343
1344         tsk->probe_unacked = false;
1345
1346         if (mtyp == CONN_PROBE) {
1347                 msg_set_type(hdr, CONN_PROBE_REPLY);
1348                 if (tipc_msg_reverse(onode, &skb, TIPC_OK))
1349                         __skb_queue_tail(xmitq, skb);
1350                 return;
1351         } else if (mtyp == CONN_ACK) {
1352                 was_cong = tsk_conn_cong(tsk);
1353                 tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
1354                 tsk->snt_unacked -= msg_conn_ack(hdr);
1355                 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1356                         tsk->snd_win = msg_adv_win(hdr);
1357                 if (was_cong && !tsk_conn_cong(tsk))
1358                         sk->sk_write_space(sk);
1359         } else if (mtyp != CONN_PROBE_REPLY) {
1360                 pr_warn("Received unknown CONN_PROTO msg\n");
1361         }
1362 exit:
1363         kfree_skb(skb);
1364 }
1365
1366 /**
1367  * tipc_sendmsg - send message in connectionless manner
1368  * @sock: socket structure
1369  * @m: message to send
1370  * @dsz: amount of user data to be sent
1371  *
1372  * Message must have an destination specified explicitly.
1373  * Used for SOCK_RDM and SOCK_DGRAM messages,
1374  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
1375  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
1376  *
1377  * Returns the number of bytes sent on success, or errno otherwise
1378  */
1379 static int tipc_sendmsg(struct socket *sock,
1380                         struct msghdr *m, size_t dsz)
1381 {
1382         struct sock *sk = sock->sk;
1383         int ret;
1384
1385         lock_sock(sk);
1386         ret = __tipc_sendmsg(sock, m, dsz);
1387         release_sock(sk);
1388
1389         return ret;
1390 }
1391
1392 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
1393 {
1394         struct sock *sk = sock->sk;
1395         struct net *net = sock_net(sk);
1396         struct tipc_sock *tsk = tipc_sk(sk);
1397         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1398         long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1399         struct list_head *clinks = &tsk->cong_links;
1400         bool syn = !tipc_sk_type_connectionless(sk);
1401         struct tipc_group *grp = tsk->group;
1402         struct tipc_msg *hdr = &tsk->phdr;
1403         struct tipc_name_seq *seq;
1404         struct sk_buff_head pkts;
1405         u32 dport = 0, dnode = 0;
1406         u32 type = 0, inst = 0;
1407         int mtu, rc;
1408
1409         if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
1410                 return -EMSGSIZE;
1411
1412         if (likely(dest)) {
1413                 if (unlikely(m->msg_namelen < sizeof(*dest)))
1414                         return -EINVAL;
1415                 if (unlikely(dest->family != AF_TIPC))
1416                         return -EINVAL;
1417         }
1418
1419         if (grp) {
1420                 if (!dest)
1421                         return tipc_send_group_bcast(sock, m, dlen, timeout);
1422                 if (dest->addrtype == TIPC_ADDR_NAME)
1423                         return tipc_send_group_anycast(sock, m, dlen, timeout);
1424                 if (dest->addrtype == TIPC_ADDR_ID)
1425                         return tipc_send_group_unicast(sock, m, dlen, timeout);
1426                 if (dest->addrtype == TIPC_ADDR_MCAST)
1427                         return tipc_send_group_mcast(sock, m, dlen, timeout);
1428                 return -EINVAL;
1429         }
1430
1431         if (unlikely(!dest)) {
1432                 dest = &tsk->peer;
1433                 if (!syn && dest->family != AF_TIPC)
1434                         return -EDESTADDRREQ;
1435         }
1436
1437         if (unlikely(syn)) {
1438                 if (sk->sk_state == TIPC_LISTEN)
1439                         return -EPIPE;
1440                 if (sk->sk_state != TIPC_OPEN)
1441                         return -EISCONN;
1442                 if (tsk->published)
1443                         return -EOPNOTSUPP;
1444                 if (dest->addrtype == TIPC_ADDR_NAME) {
1445                         tsk->conn_type = dest->addr.name.name.type;
1446                         tsk->conn_instance = dest->addr.name.name.instance;
1447                 }
1448                 msg_set_syn(hdr, 1);
1449         }
1450
1451         seq = &dest->addr.nameseq;
1452         if (dest->addrtype == TIPC_ADDR_MCAST)
1453                 return tipc_sendmcast(sock, seq, m, dlen, timeout);
1454
1455         if (dest->addrtype == TIPC_ADDR_NAME) {
1456                 type = dest->addr.name.name.type;
1457                 inst = dest->addr.name.name.instance;
1458                 dnode = dest->addr.name.domain;
1459                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
1460                 if (unlikely(!dport && !dnode))
1461                         return -EHOSTUNREACH;
1462         } else if (dest->addrtype == TIPC_ADDR_ID) {
1463                 dnode = dest->addr.id.node;
1464         } else {
1465                 return -EINVAL;
1466         }
1467
1468         /* Block or return if destination link is congested */
1469         rc = tipc_wait_for_cond(sock, &timeout,
1470                                 !tipc_dest_find(clinks, dnode, 0));
1471         if (unlikely(rc))
1472                 return rc;
1473
1474         if (dest->addrtype == TIPC_ADDR_NAME) {
1475                 msg_set_type(hdr, TIPC_NAMED_MSG);
1476                 msg_set_hdr_sz(hdr, NAMED_H_SIZE);
1477                 msg_set_nametype(hdr, type);
1478                 msg_set_nameinst(hdr, inst);
1479                 msg_set_lookup_scope(hdr, tipc_node2scope(dnode));
1480                 msg_set_destnode(hdr, dnode);
1481                 msg_set_destport(hdr, dport);
1482         } else { /* TIPC_ADDR_ID */
1483                 msg_set_type(hdr, TIPC_DIRECT_MSG);
1484                 msg_set_lookup_scope(hdr, 0);
1485                 msg_set_destnode(hdr, dnode);
1486                 msg_set_destport(hdr, dest->addr.id.ref);
1487                 msg_set_hdr_sz(hdr, BASIC_H_SIZE);
1488         }
1489
1490         __skb_queue_head_init(&pkts);
1491         mtu = tipc_node_get_mtu(net, dnode, tsk->portid, true);
1492         rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1493         if (unlikely(rc != dlen))
1494                 return rc;
1495         if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
1496                 __skb_queue_purge(&pkts);
1497                 return -ENOMEM;
1498         }
1499
1500         trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
1501         rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1502         if (unlikely(rc == -ELINKCONG)) {
1503                 tipc_dest_push(clinks, dnode, 0);
1504                 tsk->cong_link_cnt++;
1505                 rc = 0;
1506         }
1507
1508         if (unlikely(syn && !rc))
1509                 tipc_set_sk_state(sk, TIPC_CONNECTING);
1510
1511         return rc ? rc : dlen;
1512 }
1513
1514 /**
1515  * tipc_sendstream - send stream-oriented data
1516  * @sock: socket structure
1517  * @m: data to send
1518  * @dsz: total length of data to be transmitted
1519  *
1520  * Used for SOCK_STREAM data.
1521  *
1522  * Returns the number of bytes sent on success (or partial success),
1523  * or errno if no data sent
1524  */
1525 static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1526 {
1527         struct sock *sk = sock->sk;
1528         int ret;
1529
1530         lock_sock(sk);
1531         ret = __tipc_sendstream(sock, m, dsz);
1532         release_sock(sk);
1533
1534         return ret;
1535 }
1536
1537 static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1538 {
1539         struct sock *sk = sock->sk;
1540         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1541         long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1542         struct sk_buff_head *txq = &sk->sk_write_queue;
1543         struct tipc_sock *tsk = tipc_sk(sk);
1544         struct tipc_msg *hdr = &tsk->phdr;
1545         struct net *net = sock_net(sk);
1546         struct sk_buff *skb;
1547         u32 dnode = tsk_peer_node(tsk);
1548         int maxnagle = tsk->maxnagle;
1549         int maxpkt = tsk->max_pkt;
1550         int send, sent = 0;
1551         int blocks, rc = 0;
1552
1553         if (unlikely(dlen > INT_MAX))
1554                 return -EMSGSIZE;
1555
1556         /* Handle implicit connection setup */
1557         if (unlikely(dest)) {
1558                 rc = __tipc_sendmsg(sock, m, dlen);
1559                 if (dlen && dlen == rc) {
1560                         tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
1561                         tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1562                 }
1563                 return rc;
1564         }
1565
1566         do {
1567                 rc = tipc_wait_for_cond(sock, &timeout,
1568                                         (!tsk->cong_link_cnt &&
1569                                          !tsk_conn_cong(tsk) &&
1570                                          tipc_sk_connected(sk)));
1571                 if (unlikely(rc))
1572                         break;
1573                 send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1574                 blocks = tsk->snd_backlog;
1575                 if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
1576                     send <= maxnagle) {
1577                         rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
1578                         if (unlikely(rc < 0))
1579                                 break;
1580                         blocks += rc;
1581                         tsk->msg_acc++;
1582                         if (blocks <= 64 && tsk->expect_ack) {
1583                                 tsk->snd_backlog = blocks;
1584                                 sent += send;
1585                                 break;
1586                         } else if (blocks > 64) {
1587                                 tsk->pkt_cnt += skb_queue_len(txq);
1588                         } else {
1589                                 skb = skb_peek_tail(txq);
1590                                 if (skb) {
1591                                         msg_set_ack_required(buf_msg(skb));
1592                                         tsk->expect_ack = true;
1593                                 } else {
1594                                         tsk->expect_ack = false;
1595                                 }
1596                                 tsk->msg_acc = 0;
1597                                 tsk->pkt_cnt = 0;
1598                         }
1599                 } else {
1600                         rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
1601                         if (unlikely(rc != send))
1602                                 break;
1603                         blocks += tsk_inc(tsk, send + MIN_H_SIZE);
1604                 }
1605                 trace_tipc_sk_sendstream(sk, skb_peek(txq),
1606                                          TIPC_DUMP_SK_SNDQ, " ");
1607                 rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
1608                 if (unlikely(rc == -ELINKCONG)) {
1609                         tsk->cong_link_cnt = 1;
1610                         rc = 0;
1611                 }
1612                 if (likely(!rc)) {
1613                         tsk->snt_unacked += blocks;
1614                         tsk->snd_backlog = 0;
1615                         sent += send;
1616                 }
1617         } while (sent < dlen && !rc);
1618
1619         return sent ? sent : rc;
1620 }
1621
1622 /**
1623  * tipc_send_packet - send a connection-oriented message
1624  * @sock: socket structure
1625  * @m: message to send
1626  * @dsz: length of data to be transmitted
1627  *
1628  * Used for SOCK_SEQPACKET messages.
1629  *
1630  * Returns the number of bytes sent on success, or errno otherwise
1631  */
1632 static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1633 {
1634         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1635                 return -EMSGSIZE;
1636
1637         return tipc_sendstream(sock, m, dsz);
1638 }
1639
1640 /* tipc_sk_finish_conn - complete the setup of a connection
1641  */
1642 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1643                                 u32 peer_node)
1644 {
1645         struct sock *sk = &tsk->sk;
1646         struct net *net = sock_net(sk);
1647         struct tipc_msg *msg = &tsk->phdr;
1648
1649         msg_set_syn(msg, 0);
1650         msg_set_destnode(msg, peer_node);
1651         msg_set_destport(msg, peer_port);
1652         msg_set_type(msg, TIPC_CONN_MSG);
1653         msg_set_lookup_scope(msg, 0);
1654         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1655
1656         sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
1657         tipc_set_sk_state(sk, TIPC_ESTABLISHED);
1658         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1659         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
1660         tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1661         tsk_set_nagle(tsk);
1662         __skb_queue_purge(&sk->sk_write_queue);
1663         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1664                 return;
1665
1666         /* Fall back to message based flow control */
1667         tsk->rcv_win = FLOWCTL_MSG_WIN;
1668         tsk->snd_win = FLOWCTL_MSG_WIN;
1669 }
1670
1671 /**
1672  * tipc_sk_set_orig_addr - capture sender's address for received message
1673  * @m: descriptor for message info
1674  * @skb: received message
1675  *
1676  * Note: Address is not captured if not requested by receiver.
1677  */
1678 static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
1679 {
1680         DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
1681         struct tipc_msg *hdr = buf_msg(skb);
1682
1683         if (!srcaddr)
1684                 return;
1685
1686         srcaddr->sock.family = AF_TIPC;
1687         srcaddr->sock.addrtype = TIPC_ADDR_ID;
1688         srcaddr->sock.scope = 0;
1689         srcaddr->sock.addr.id.ref = msg_origport(hdr);
1690         srcaddr->sock.addr.id.node = msg_orignode(hdr);
1691         srcaddr->sock.addr.name.domain = 0;
1692         m->msg_namelen = sizeof(struct sockaddr_tipc);
1693
1694         if (!msg_in_group(hdr))
1695                 return;
1696
1697         /* Group message users may also want to know sending member's id */
1698         srcaddr->member.family = AF_TIPC;
1699         srcaddr->member.addrtype = TIPC_ADDR_NAME;
1700         srcaddr->member.scope = 0;
1701         srcaddr->member.addr.name.name.type = msg_nametype(hdr);
1702         srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
1703         srcaddr->member.addr.name.domain = 0;
1704         m->msg_namelen = sizeof(*srcaddr);
1705 }
1706
1707 /**
1708  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1709  * @m: descriptor for message info
1710  * @skb: received message buffer
1711  * @tsk: TIPC port associated with message
1712  *
1713  * Note: Ancillary data is not captured if not requested by receiver.
1714  *
1715  * Returns 0 if successful, otherwise errno
1716  */
1717 static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb,
1718                                  struct tipc_sock *tsk)
1719 {
1720         struct tipc_msg *msg;
1721         u32 anc_data[3];
1722         u32 err;
1723         u32 dest_type;
1724         int has_name;
1725         int res;
1726
1727         if (likely(m->msg_controllen == 0))
1728                 return 0;
1729         msg = buf_msg(skb);
1730
1731         /* Optionally capture errored message object(s) */
1732         err = msg ? msg_errcode(msg) : 0;
1733         if (unlikely(err)) {
1734                 anc_data[0] = err;
1735                 anc_data[1] = msg_data_sz(msg);
1736                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1737                 if (res)
1738                         return res;
1739                 if (anc_data[1]) {
1740                         if (skb_linearize(skb))
1741                                 return -ENOMEM;
1742                         msg = buf_msg(skb);
1743                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1744                                        msg_data(msg));
1745                         if (res)
1746                                 return res;
1747                 }
1748         }
1749
1750         /* Optionally capture message destination object */
1751         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1752         switch (dest_type) {
1753         case TIPC_NAMED_MSG:
1754                 has_name = 1;
1755                 anc_data[0] = msg_nametype(msg);
1756                 anc_data[1] = msg_namelower(msg);
1757                 anc_data[2] = msg_namelower(msg);
1758                 break;
1759         case TIPC_MCAST_MSG:
1760                 has_name = 1;
1761                 anc_data[0] = msg_nametype(msg);
1762                 anc_data[1] = msg_namelower(msg);
1763                 anc_data[2] = msg_nameupper(msg);
1764                 break;
1765         case TIPC_CONN_MSG:
1766                 has_name = (tsk->conn_type != 0);
1767                 anc_data[0] = tsk->conn_type;
1768                 anc_data[1] = tsk->conn_instance;
1769                 anc_data[2] = tsk->conn_instance;
1770                 break;
1771         default:
1772                 has_name = 0;
1773         }
1774         if (has_name) {
1775                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1776                 if (res)
1777                         return res;
1778         }
1779
1780         return 0;
1781 }
1782
1783 static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
1784 {
1785         struct sock *sk = &tsk->sk;
1786         struct sk_buff *skb = NULL;
1787         struct tipc_msg *msg;
1788         u32 peer_port = tsk_peer_port(tsk);
1789         u32 dnode = tsk_peer_node(tsk);
1790
1791         if (!tipc_sk_connected(sk))
1792                 return NULL;
1793         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1794                               dnode, tsk_own_node(tsk), peer_port,
1795                               tsk->portid, TIPC_OK);
1796         if (!skb)
1797                 return NULL;
1798         msg = buf_msg(skb);
1799         msg_set_conn_ack(msg, tsk->rcv_unacked);
1800         tsk->rcv_unacked = 0;
1801
1802         /* Adjust to and advertize the correct window limit */
1803         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1804                 tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1805                 msg_set_adv_win(msg, tsk->rcv_win);
1806         }
1807         return skb;
1808 }
1809
1810 static void tipc_sk_send_ack(struct tipc_sock *tsk)
1811 {
1812         struct sk_buff *skb;
1813
1814         skb = tipc_sk_build_ack(tsk);
1815         if (!skb)
1816                 return;
1817
1818         tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
1819                            msg_link_selector(buf_msg(skb)));
1820 }
1821
1822 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1823 {
1824         struct sock *sk = sock->sk;
1825         DEFINE_WAIT_FUNC(wait, woken_wake_function);
1826         long timeo = *timeop;
1827         int err = sock_error(sk);
1828
1829         if (err)
1830                 return err;
1831
1832         for (;;) {
1833                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1834                         if (sk->sk_shutdown & RCV_SHUTDOWN) {
1835                                 err = -ENOTCONN;
1836                                 break;
1837                         }
1838                         add_wait_queue(sk_sleep(sk), &wait);
1839                         release_sock(sk);
1840                         timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
1841                         sched_annotate_sleep();
1842                         lock_sock(sk);
1843                         remove_wait_queue(sk_sleep(sk), &wait);
1844                 }
1845                 err = 0;
1846                 if (!skb_queue_empty(&sk->sk_receive_queue))
1847                         break;
1848                 err = -EAGAIN;
1849                 if (!timeo)
1850                         break;
1851                 err = sock_intr_errno(timeo);
1852                 if (signal_pending(current))
1853                         break;
1854
1855                 err = sock_error(sk);
1856                 if (err)
1857                         break;
1858         }
1859         *timeop = timeo;
1860         return err;
1861 }
1862
1863 /**
1864  * tipc_recvmsg - receive packet-oriented message
1865  * @m: descriptor for message info
1866  * @buflen: length of user buffer area
1867  * @flags: receive flags
1868  *
1869  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1870  * If the complete message doesn't fit in user area, truncate it.
1871  *
1872  * Returns size of returned message data, errno otherwise
1873  */
1874 static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1875                         size_t buflen,  int flags)
1876 {
1877         struct sock *sk = sock->sk;
1878         bool connected = !tipc_sk_type_connectionless(sk);
1879         struct tipc_sock *tsk = tipc_sk(sk);
1880         int rc, err, hlen, dlen, copy;
1881         struct sk_buff_head xmitq;
1882         struct tipc_msg *hdr;
1883         struct sk_buff *skb;
1884         bool grp_evt;
1885         long timeout;
1886
1887         /* Catch invalid receive requests */
1888         if (unlikely(!buflen))
1889                 return -EINVAL;
1890
1891         lock_sock(sk);
1892         if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
1893                 rc = -ENOTCONN;
1894                 goto exit;
1895         }
1896         timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1897
1898         /* Step rcv queue to first msg with data or error; wait if necessary */
1899         do {
1900                 rc = tipc_wait_for_rcvmsg(sock, &timeout);
1901                 if (unlikely(rc))
1902                         goto exit;
1903                 skb = skb_peek(&sk->sk_receive_queue);
1904                 hdr = buf_msg(skb);
1905                 dlen = msg_data_sz(hdr);
1906                 hlen = msg_hdr_sz(hdr);
1907                 err = msg_errcode(hdr);
1908                 grp_evt = msg_is_grp_evt(hdr);
1909                 if (likely(dlen || err))
1910                         break;
1911                 tsk_advance_rx_queue(sk);
1912         } while (1);
1913
1914         /* Collect msg meta data, including error code and rejected data */
1915         tipc_sk_set_orig_addr(m, skb);
1916         rc = tipc_sk_anc_data_recv(m, skb, tsk);
1917         if (unlikely(rc))
1918                 goto exit;
1919         hdr = buf_msg(skb);
1920
1921         /* Capture data if non-error msg, otherwise just set return value */
1922         if (likely(!err)) {
1923                 copy = min_t(int, dlen, buflen);
1924                 if (unlikely(copy != dlen))
1925                         m->msg_flags |= MSG_TRUNC;
1926                 rc = skb_copy_datagram_msg(skb, hlen, m, copy);
1927         } else {
1928                 copy = 0;
1929                 rc = 0;
1930                 if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control)
1931                         rc = -ECONNRESET;
1932         }
1933         if (unlikely(rc))
1934                 goto exit;
1935
1936         /* Mark message as group event if applicable */
1937         if (unlikely(grp_evt)) {
1938                 if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
1939                         m->msg_flags |= MSG_EOR;
1940                 m->msg_flags |= MSG_OOB;
1941                 copy = 0;
1942         }
1943
1944         /* Caption of data or error code/rejected data was successful */
1945         if (unlikely(flags & MSG_PEEK))
1946                 goto exit;
1947
1948         /* Send group flow control advertisement when applicable */
1949         if (tsk->group && msg_in_group(hdr) && !grp_evt) {
1950                 __skb_queue_head_init(&xmitq);
1951                 tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
1952                                           msg_orignode(hdr), msg_origport(hdr),
1953                                           &xmitq);
1954                 tipc_node_distr_xmit(sock_net(sk), &xmitq);
1955         }
1956
1957         tsk_advance_rx_queue(sk);
1958
1959         if (likely(!connected))
1960                 goto exit;
1961
1962         /* Send connection flow control advertisement when applicable */
1963         tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1964         if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
1965                 tipc_sk_send_ack(tsk);
1966 exit:
1967         release_sock(sk);
1968         return rc ? rc : copy;
1969 }
1970
1971 /**
1972  * tipc_recvstream - receive stream-oriented data
1973  * @m: descriptor for message info
1974  * @buflen: total size of user buffer area
1975  * @flags: receive flags
1976  *
1977  * Used for SOCK_STREAM messages only.  If not enough data is available
1978  * will optionally wait for more; never truncates data.
1979  *
1980  * Returns size of returned message data, errno otherwise
1981  */
1982 static int tipc_recvstream(struct socket *sock, struct msghdr *m,
1983                            size_t buflen, int flags)
1984 {
1985         struct sock *sk = sock->sk;
1986         struct tipc_sock *tsk = tipc_sk(sk);
1987         struct sk_buff *skb;
1988         struct tipc_msg *hdr;
1989         struct tipc_skb_cb *skb_cb;
1990         bool peek = flags & MSG_PEEK;
1991         int offset, required, copy, copied = 0;
1992         int hlen, dlen, err, rc;
1993         long timeout;
1994
1995         /* Catch invalid receive attempts */
1996         if (unlikely(!buflen))
1997                 return -EINVAL;
1998
1999         lock_sock(sk);
2000
2001         if (unlikely(sk->sk_state == TIPC_OPEN)) {
2002                 rc = -ENOTCONN;
2003                 goto exit;
2004         }
2005         required = sock_rcvlowat(sk, flags & MSG_WAITALL, buflen);
2006         timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
2007
2008         do {
2009                 /* Look at first msg in receive queue; wait if necessary */
2010                 rc = tipc_wait_for_rcvmsg(sock, &timeout);
2011                 if (unlikely(rc))
2012                         break;
2013                 skb = skb_peek(&sk->sk_receive_queue);
2014                 skb_cb = TIPC_SKB_CB(skb);
2015                 hdr = buf_msg(skb);
2016                 dlen = msg_data_sz(hdr);
2017                 hlen = msg_hdr_sz(hdr);
2018                 err = msg_errcode(hdr);
2019
2020                 /* Discard any empty non-errored (SYN-) message */
2021                 if (unlikely(!dlen && !err)) {
2022                         tsk_advance_rx_queue(sk);
2023                         continue;
2024                 }
2025
2026                 /* Collect msg meta data, incl. error code and rejected data */
2027                 if (!copied) {
2028                         tipc_sk_set_orig_addr(m, skb);
2029                         rc = tipc_sk_anc_data_recv(m, skb, tsk);
2030                         if (rc)
2031                                 break;
2032                         hdr = buf_msg(skb);
2033                 }
2034
2035                 /* Copy data if msg ok, otherwise return error/partial data */
2036                 if (likely(!err)) {
2037                         offset = skb_cb->bytes_read;
2038                         copy = min_t(int, dlen - offset, buflen - copied);
2039                         rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
2040                         if (unlikely(rc))
2041                                 break;
2042                         copied += copy;
2043                         offset += copy;
2044                         if (unlikely(offset < dlen)) {
2045                                 if (!peek)
2046                                         skb_cb->bytes_read = offset;
2047                                 break;
2048                         }
2049                 } else {
2050                         rc = 0;
2051                         if ((err != TIPC_CONN_SHUTDOWN) && !m->msg_control)
2052                                 rc = -ECONNRESET;
2053                         if (copied || rc)
2054                                 break;
2055                 }
2056
2057                 if (unlikely(peek))
2058                         break;
2059
2060                 tsk_advance_rx_queue(sk);
2061
2062                 /* Send connection flow control advertisement when applicable */
2063                 tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
2064                 if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
2065                         tipc_sk_send_ack(tsk);
2066
2067                 /* Exit if all requested data or FIN/error received */
2068                 if (copied == buflen || err)
2069                         break;
2070
2071         } while (!skb_queue_empty(&sk->sk_receive_queue) || copied < required);
2072 exit:
2073         release_sock(sk);
2074         return copied ? copied : rc;
2075 }
2076
2077 /**
2078  * tipc_write_space - wake up thread if port congestion is released
2079  * @sk: socket
2080  */
2081 static void tipc_write_space(struct sock *sk)
2082 {
2083         struct socket_wq *wq;
2084
2085         rcu_read_lock();
2086         wq = rcu_dereference(sk->sk_wq);
2087         if (skwq_has_sleeper(wq))
2088                 wake_up_interruptible_sync_poll(&wq->wait, EPOLLOUT |
2089                                                 EPOLLWRNORM | EPOLLWRBAND);
2090         rcu_read_unlock();
2091 }
2092
2093 /**
2094  * tipc_data_ready - wake up threads to indicate messages have been received
2095  * @sk: socket
2096  */
2097 static void tipc_data_ready(struct sock *sk)
2098 {
2099         struct socket_wq *wq;
2100
2101         rcu_read_lock();
2102         wq = rcu_dereference(sk->sk_wq);
2103         if (skwq_has_sleeper(wq))
2104                 wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN |
2105                                                 EPOLLRDNORM | EPOLLRDBAND);
2106         rcu_read_unlock();
2107 }
2108
2109 static void tipc_sock_destruct(struct sock *sk)
2110 {
2111         __skb_queue_purge(&sk->sk_receive_queue);
2112 }
2113
2114 static void tipc_sk_proto_rcv(struct sock *sk,
2115                               struct sk_buff_head *inputq,
2116                               struct sk_buff_head *xmitq)
2117 {
2118         struct sk_buff *skb = __skb_dequeue(inputq);
2119         struct tipc_sock *tsk = tipc_sk(sk);
2120         struct tipc_msg *hdr = buf_msg(skb);
2121         struct tipc_group *grp = tsk->group;
2122         bool wakeup = false;
2123
2124         switch (msg_user(hdr)) {
2125         case CONN_MANAGER:
2126                 tipc_sk_conn_proto_rcv(tsk, skb, inputq, xmitq);
2127                 return;
2128         case SOCK_WAKEUP:
2129                 tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
2130                 /* coupled with smp_rmb() in tipc_wait_for_cond() */
2131                 smp_wmb();
2132                 tsk->cong_link_cnt--;
2133                 wakeup = true;
2134                 tipc_sk_push_backlog(tsk, false);
2135                 break;
2136         case GROUP_PROTOCOL:
2137                 tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
2138                 break;
2139         case TOP_SRV:
2140                 tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
2141                                       hdr, inputq, xmitq);
2142                 break;
2143         default:
2144                 break;
2145         }
2146
2147         if (wakeup)
2148                 sk->sk_write_space(sk);
2149
2150         kfree_skb(skb);
2151 }
2152
2153 /**
2154  * tipc_sk_filter_connect - check incoming message for a connection-based socket
2155  * @tsk: TIPC socket
2156  * @skb: pointer to message buffer.
2157  * @xmitq: for Nagle ACK if any
2158  * Returns true if message should be added to receive queue, false otherwise
2159  */
2160 static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
2161                                    struct sk_buff_head *xmitq)
2162 {
2163         struct sock *sk = &tsk->sk;
2164         struct net *net = sock_net(sk);
2165         struct tipc_msg *hdr = buf_msg(skb);
2166         bool con_msg = msg_connected(hdr);
2167         u32 pport = tsk_peer_port(tsk);
2168         u32 pnode = tsk_peer_node(tsk);
2169         u32 oport = msg_origport(hdr);
2170         u32 onode = msg_orignode(hdr);
2171         int err = msg_errcode(hdr);
2172         unsigned long delay;
2173
2174         if (unlikely(msg_mcast(hdr)))
2175                 return false;
2176         tsk->oneway = 0;
2177
2178         switch (sk->sk_state) {
2179         case TIPC_CONNECTING:
2180                 /* Setup ACK */
2181                 if (likely(con_msg)) {
2182                         if (err)
2183                                 break;
2184                         tipc_sk_finish_conn(tsk, oport, onode);
2185                         msg_set_importance(&tsk->phdr, msg_importance(hdr));
2186                         /* ACK+ message with data is added to receive queue */
2187                         if (msg_data_sz(hdr))
2188                                 return true;
2189                         /* Empty ACK-, - wake up sleeping connect() and drop */
2190                         sk->sk_state_change(sk);
2191                         msg_set_dest_droppable(hdr, 1);
2192                         return false;
2193                 }
2194                 /* Ignore connectionless message if not from listening socket */
2195                 if (oport != pport || onode != pnode)
2196                         return false;
2197
2198                 /* Rejected SYN */
2199                 if (err != TIPC_ERR_OVERLOAD)
2200                         break;
2201
2202                 /* Prepare for new setup attempt if we have a SYN clone */
2203                 if (skb_queue_empty(&sk->sk_write_queue))
2204                         break;
2205                 get_random_bytes(&delay, 2);
2206                 delay %= (tsk->conn_timeout / 4);
2207                 delay = msecs_to_jiffies(delay + 100);
2208                 sk_reset_timer(sk, &sk->sk_timer, jiffies + delay);
2209                 return false;
2210         case TIPC_OPEN:
2211         case TIPC_DISCONNECTING:
2212                 return false;
2213         case TIPC_LISTEN:
2214                 /* Accept only SYN message */
2215                 if (!msg_is_syn(hdr) &&
2216                     tipc_node_get_capabilities(net, onode) & TIPC_SYN_BIT)
2217                         return false;
2218                 if (!con_msg && !err)
2219                         return true;
2220                 return false;
2221         case TIPC_ESTABLISHED:
2222                 if (!skb_queue_empty(&sk->sk_write_queue))
2223                         tipc_sk_push_backlog(tsk, false);
2224                 /* Accept only connection-based messages sent by peer */
2225                 if (likely(con_msg && !err && pport == oport &&
2226                            pnode == onode)) {
2227                         if (msg_ack_required(hdr)) {
2228                                 struct sk_buff *skb;
2229
2230                                 skb = tipc_sk_build_ack(tsk);
2231                                 if (skb) {
2232                                         msg_set_nagle_ack(buf_msg(skb));
2233                                         __skb_queue_tail(xmitq, skb);
2234                                 }
2235                         }
2236                         return true;
2237                 }
2238                 if (!tsk_peer_msg(tsk, hdr))
2239                         return false;
2240                 if (!err)
2241                         return true;
2242                 tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2243                 tipc_node_remove_conn(net, pnode, tsk->portid);
2244                 sk->sk_state_change(sk);
2245                 return true;
2246         default:
2247                 pr_err("Unknown sk_state %u\n", sk->sk_state);
2248         }
2249         /* Abort connection setup attempt */
2250         tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2251         sk->sk_err = ECONNREFUSED;
2252         sk->sk_state_change(sk);
2253         return true;
2254 }
2255
2256 /**
2257  * rcvbuf_limit - get proper overload limit of socket receive queue
2258  * @sk: socket
2259  * @skb: message
2260  *
2261  * For connection oriented messages, irrespective of importance,
2262  * default queue limit is 2 MB.
2263  *
2264  * For connectionless messages, queue limits are based on message
2265  * importance as follows:
2266  *
2267  * TIPC_LOW_IMPORTANCE       (2 MB)
2268  * TIPC_MEDIUM_IMPORTANCE    (4 MB)
2269  * TIPC_HIGH_IMPORTANCE      (8 MB)
2270  * TIPC_CRITICAL_IMPORTANCE  (16 MB)
2271  *
2272  * Returns overload limit according to corresponding message importance
2273  */
2274 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
2275 {
2276         struct tipc_sock *tsk = tipc_sk(sk);
2277         struct tipc_msg *hdr = buf_msg(skb);
2278
2279         if (unlikely(msg_in_group(hdr)))
2280                 return READ_ONCE(sk->sk_rcvbuf);
2281
2282         if (unlikely(!msg_connected(hdr)))
2283                 return READ_ONCE(sk->sk_rcvbuf) << msg_importance(hdr);
2284
2285         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
2286                 return READ_ONCE(sk->sk_rcvbuf);
2287
2288         return FLOWCTL_MSG_LIM;
2289 }
2290
2291 /**
2292  * tipc_sk_filter_rcv - validate incoming message
2293  * @sk: socket
2294  * @skb: pointer to message.
2295  *
2296  * Enqueues message on receive queue if acceptable; optionally handles
2297  * disconnect indication for a connected socket.
2298  *
2299  * Called with socket lock already taken
2300  *
2301  */
2302 static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
2303                                struct sk_buff_head *xmitq)
2304 {
2305         bool sk_conn = !tipc_sk_type_connectionless(sk);
2306         struct tipc_sock *tsk = tipc_sk(sk);
2307         struct tipc_group *grp = tsk->group;
2308         struct tipc_msg *hdr = buf_msg(skb);
2309         struct net *net = sock_net(sk);
2310         struct sk_buff_head inputq;
2311         int mtyp = msg_type(hdr);
2312         int limit, err = TIPC_OK;
2313
2314         trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
2315         TIPC_SKB_CB(skb)->bytes_read = 0;
2316         __skb_queue_head_init(&inputq);
2317         __skb_queue_tail(&inputq, skb);
2318
2319         if (unlikely(!msg_isdata(hdr)))
2320                 tipc_sk_proto_rcv(sk, &inputq, xmitq);
2321
2322         if (unlikely(grp))
2323                 tipc_group_filter_msg(grp, &inputq, xmitq);
2324
2325         if (unlikely(!grp) && mtyp == TIPC_MCAST_MSG)
2326                 tipc_mcast_filter_msg(net, &tsk->mc_method.deferredq, &inputq);
2327
2328         /* Validate and add to receive buffer if there is space */
2329         while ((skb = __skb_dequeue(&inputq))) {
2330                 hdr = buf_msg(skb);
2331                 limit = rcvbuf_limit(sk, skb);
2332                 if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) ||
2333                     (!sk_conn && msg_connected(hdr)) ||
2334                     (!grp && msg_in_group(hdr)))
2335                         err = TIPC_ERR_NO_PORT;
2336                 else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
2337                         trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
2338                                            "err_overload2!");
2339                         atomic_inc(&sk->sk_drops);
2340                         err = TIPC_ERR_OVERLOAD;
2341                 }
2342
2343                 if (unlikely(err)) {
2344                         if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
2345                                 trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
2346                                                       "@filter_rcv!");
2347                                 __skb_queue_tail(xmitq, skb);
2348                         }
2349                         err = TIPC_OK;
2350                         continue;
2351                 }
2352                 __skb_queue_tail(&sk->sk_receive_queue, skb);
2353                 skb_set_owner_r(skb, sk);
2354                 trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
2355                                          "rcvq >90% allocated!");
2356                 sk->sk_data_ready(sk);
2357         }
2358 }
2359
2360 /**
2361  * tipc_sk_backlog_rcv - handle incoming message from backlog queue
2362  * @sk: socket
2363  * @skb: message
2364  *
2365  * Caller must hold socket lock
2366  */
2367 static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
2368 {
2369         unsigned int before = sk_rmem_alloc_get(sk);
2370         struct sk_buff_head xmitq;
2371         unsigned int added;
2372
2373         __skb_queue_head_init(&xmitq);
2374
2375         tipc_sk_filter_rcv(sk, skb, &xmitq);
2376         added = sk_rmem_alloc_get(sk) - before;
2377         atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
2378
2379         /* Send pending response/rejected messages, if any */
2380         tipc_node_distr_xmit(sock_net(sk), &xmitq);
2381         return 0;
2382 }
2383
2384 /**
2385  * tipc_sk_enqueue - extract all buffers with destination 'dport' from
2386  *                   inputq and try adding them to socket or backlog queue
2387  * @inputq: list of incoming buffers with potentially different destinations
2388  * @sk: socket where the buffers should be enqueued
2389  * @dport: port number for the socket
2390  *
2391  * Caller must hold socket lock
2392  */
2393 static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
2394                             u32 dport, struct sk_buff_head *xmitq)
2395 {
2396         unsigned long time_limit = jiffies + 2;
2397         struct sk_buff *skb;
2398         unsigned int lim;
2399         atomic_t *dcnt;
2400         u32 onode;
2401
2402         while (skb_queue_len(inputq)) {
2403                 if (unlikely(time_after_eq(jiffies, time_limit)))
2404                         return;
2405
2406                 skb = tipc_skb_dequeue(inputq, dport);
2407                 if (unlikely(!skb))
2408                         return;
2409
2410                 /* Add message directly to receive queue if possible */
2411                 if (!sock_owned_by_user(sk)) {
2412                         tipc_sk_filter_rcv(sk, skb, xmitq);
2413                         continue;
2414                 }
2415
2416                 /* Try backlog, compensating for double-counted bytes */
2417                 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
2418                 if (!sk->sk_backlog.len)
2419                         atomic_set(dcnt, 0);
2420                 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
2421                 if (likely(!sk_add_backlog(sk, skb, lim))) {
2422                         trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
2423                                                  "bklg & rcvq >90% allocated!");
2424                         continue;
2425                 }
2426
2427                 trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
2428                 /* Overload => reject message back to sender */
2429                 onode = tipc_own_addr(sock_net(sk));
2430                 atomic_inc(&sk->sk_drops);
2431                 if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
2432                         trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
2433                                               "@sk_enqueue!");
2434                         __skb_queue_tail(xmitq, skb);
2435                 }
2436                 break;
2437         }
2438 }
2439
2440 /**
2441  * tipc_sk_rcv - handle a chain of incoming buffers
2442  * @inputq: buffer list containing the buffers
2443  * Consumes all buffers in list until inputq is empty
2444  * Note: may be called in multiple threads referring to the same queue
2445  */
2446 void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
2447 {
2448         struct sk_buff_head xmitq;
2449         u32 dnode, dport = 0;
2450         int err;
2451         struct tipc_sock *tsk;
2452         struct sock *sk;
2453         struct sk_buff *skb;
2454
2455         __skb_queue_head_init(&xmitq);
2456         while (skb_queue_len(inputq)) {
2457                 dport = tipc_skb_peek_port(inputq, dport);
2458                 tsk = tipc_sk_lookup(net, dport);
2459
2460                 if (likely(tsk)) {
2461                         sk = &tsk->sk;
2462                         if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
2463                                 tipc_sk_enqueue(inputq, sk, dport, &xmitq);
2464                                 spin_unlock_bh(&sk->sk_lock.slock);
2465                         }
2466                         /* Send pending response/rejected messages, if any */
2467                         tipc_node_distr_xmit(sock_net(sk), &xmitq);
2468                         sock_put(sk);
2469                         continue;
2470                 }
2471                 /* No destination socket => dequeue skb if still there */
2472                 skb = tipc_skb_dequeue(inputq, dport);
2473                 if (!skb)
2474                         return;
2475
2476                 /* Try secondary lookup if unresolved named message */
2477                 err = TIPC_ERR_NO_PORT;
2478                 if (tipc_msg_lookup_dest(net, skb, &err))
2479                         goto xmit;
2480
2481                 /* Prepare for message rejection */
2482                 if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
2483                         continue;
2484
2485                 trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
2486 xmit:
2487                 dnode = msg_destnode(buf_msg(skb));
2488                 tipc_node_xmit_skb(net, skb, dnode, dport);
2489         }
2490 }
2491
2492 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
2493 {
2494         DEFINE_WAIT_FUNC(wait, woken_wake_function);
2495         struct sock *sk = sock->sk;
2496         int done;
2497
2498         do {
2499                 int err = sock_error(sk);
2500                 if (err)
2501                         return err;
2502                 if (!*timeo_p)
2503                         return -ETIMEDOUT;
2504                 if (signal_pending(current))
2505                         return sock_intr_errno(*timeo_p);
2506                 if (sk->sk_state == TIPC_DISCONNECTING)
2507                         break;
2508
2509                 add_wait_queue(sk_sleep(sk), &wait);
2510                 done = sk_wait_event(sk, timeo_p, tipc_sk_connected(sk),
2511                                      &wait);
2512                 remove_wait_queue(sk_sleep(sk), &wait);
2513         } while (!done);
2514         return 0;
2515 }
2516
2517 static bool tipc_sockaddr_is_sane(struct sockaddr_tipc *addr)
2518 {
2519         if (addr->family != AF_TIPC)
2520                 return false;
2521         if (addr->addrtype == TIPC_SERVICE_RANGE)
2522                 return (addr->addr.nameseq.lower <= addr->addr.nameseq.upper);
2523         return (addr->addrtype == TIPC_SERVICE_ADDR ||
2524                 addr->addrtype == TIPC_SOCKET_ADDR);
2525 }
2526
2527 /**
2528  * tipc_connect - establish a connection to another TIPC port
2529  * @sock: socket structure
2530  * @dest: socket address for destination port
2531  * @destlen: size of socket address data structure
2532  * @flags: file-related flags associated with socket
2533  *
2534  * Returns 0 on success, errno otherwise
2535  */
2536 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
2537                         int destlen, int flags)
2538 {
2539         struct sock *sk = sock->sk;
2540         struct tipc_sock *tsk = tipc_sk(sk);
2541         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
2542         struct msghdr m = {NULL,};
2543         long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
2544         int previous;
2545         int res = 0;
2546
2547         if (destlen != sizeof(struct sockaddr_tipc))
2548                 return -EINVAL;
2549
2550         lock_sock(sk);
2551
2552         if (tsk->group) {
2553                 res = -EINVAL;
2554                 goto exit;
2555         }
2556
2557         if (dst->family == AF_UNSPEC) {
2558                 memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
2559                 if (!tipc_sk_type_connectionless(sk))
2560                         res = -EINVAL;
2561                 goto exit;
2562         }
2563         if (!tipc_sockaddr_is_sane(dst)) {
2564                 res = -EINVAL;
2565                 goto exit;
2566         }
2567         /* DGRAM/RDM connect(), just save the destaddr */
2568         if (tipc_sk_type_connectionless(sk)) {
2569                 memcpy(&tsk->peer, dest, destlen);
2570                 goto exit;
2571         } else if (dst->addrtype == TIPC_SERVICE_RANGE) {
2572                 res = -EINVAL;
2573                 goto exit;
2574         }
2575
2576         previous = sk->sk_state;
2577
2578         switch (sk->sk_state) {
2579         case TIPC_OPEN:
2580                 /* Send a 'SYN-' to destination */
2581                 m.msg_name = dest;
2582                 m.msg_namelen = destlen;
2583
2584                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
2585                  * indicate send_msg() is never blocked.
2586                  */
2587                 if (!timeout)
2588                         m.msg_flags = MSG_DONTWAIT;
2589
2590                 res = __tipc_sendmsg(sock, &m, 0);
2591                 if ((res < 0) && (res != -EWOULDBLOCK))
2592                         goto exit;
2593
2594                 /* Just entered TIPC_CONNECTING state; the only
2595                  * difference is that return value in non-blocking
2596                  * case is EINPROGRESS, rather than EALREADY.
2597                  */
2598                 res = -EINPROGRESS;
2599                 fallthrough;
2600         case TIPC_CONNECTING:
2601                 if (!timeout) {
2602                         if (previous == TIPC_CONNECTING)
2603                                 res = -EALREADY;
2604                         goto exit;
2605                 }
2606                 timeout = msecs_to_jiffies(timeout);
2607                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
2608                 res = tipc_wait_for_connect(sock, &timeout);
2609                 break;
2610         case TIPC_ESTABLISHED:
2611                 res = -EISCONN;
2612                 break;
2613         default:
2614                 res = -EINVAL;
2615         }
2616
2617 exit:
2618         release_sock(sk);
2619         return res;
2620 }
2621
2622 /**
2623  * tipc_listen - allow socket to listen for incoming connections
2624  * @sock: socket structure
2625  * @len: (unused)
2626  *
2627  * Returns 0 on success, errno otherwise
2628  */
2629 static int tipc_listen(struct socket *sock, int len)
2630 {
2631         struct sock *sk = sock->sk;
2632         int res;
2633
2634         lock_sock(sk);
2635         res = tipc_set_sk_state(sk, TIPC_LISTEN);
2636         release_sock(sk);
2637
2638         return res;
2639 }
2640
2641 static int tipc_wait_for_accept(struct socket *sock, long timeo)
2642 {
2643         struct sock *sk = sock->sk;
2644         DEFINE_WAIT(wait);
2645         int err;
2646
2647         /* True wake-one mechanism for incoming connections: only
2648          * one process gets woken up, not the 'whole herd'.
2649          * Since we do not 'race & poll' for established sockets
2650          * anymore, the common case will execute the loop only once.
2651         */
2652         for (;;) {
2653                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
2654                                           TASK_INTERRUPTIBLE);
2655                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
2656                         release_sock(sk);
2657                         timeo = schedule_timeout(timeo);
2658                         lock_sock(sk);
2659                 }
2660                 err = 0;
2661                 if (!skb_queue_empty(&sk->sk_receive_queue))
2662                         break;
2663                 err = -EAGAIN;
2664                 if (!timeo)
2665                         break;
2666                 err = sock_intr_errno(timeo);
2667                 if (signal_pending(current))
2668                         break;
2669         }
2670         finish_wait(sk_sleep(sk), &wait);
2671         return err;
2672 }
2673
2674 /**
2675  * tipc_accept - wait for connection request
2676  * @sock: listening socket
2677  * @new_sock: new socket that is to be connected
2678  * @flags: file-related flags associated with socket
2679  *
2680  * Returns 0 on success, errno otherwise
2681  */
2682 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
2683                        bool kern)
2684 {
2685         struct sock *new_sk, *sk = sock->sk;
2686         struct sk_buff *buf;
2687         struct tipc_sock *new_tsock;
2688         struct tipc_msg *msg;
2689         long timeo;
2690         int res;
2691
2692         lock_sock(sk);
2693
2694         if (sk->sk_state != TIPC_LISTEN) {
2695                 res = -EINVAL;
2696                 goto exit;
2697         }
2698         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2699         res = tipc_wait_for_accept(sock, timeo);
2700         if (res)
2701                 goto exit;
2702
2703         buf = skb_peek(&sk->sk_receive_queue);
2704
2705         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern);
2706         if (res)
2707                 goto exit;
2708         security_sk_clone(sock->sk, new_sock->sk);
2709
2710         new_sk = new_sock->sk;
2711         new_tsock = tipc_sk(new_sk);
2712         msg = buf_msg(buf);
2713
2714         /* we lock on new_sk; but lockdep sees the lock on sk */
2715         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2716
2717         /*
2718          * Reject any stray messages received by new socket
2719          * before the socket lock was taken (very, very unlikely)
2720          */
2721         tsk_rej_rx_queue(new_sk, TIPC_ERR_NO_PORT);
2722
2723         /* Connect new socket to it's peer */
2724         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2725
2726         tsk_set_importance(new_sk, msg_importance(msg));
2727         if (msg_named(msg)) {
2728                 new_tsock->conn_type = msg_nametype(msg);
2729                 new_tsock->conn_instance = msg_nameinst(msg);
2730         }
2731
2732         /*
2733          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2734          * Respond to 'SYN+' by queuing it on new socket.
2735          */
2736         if (!msg_data_sz(msg)) {
2737                 struct msghdr m = {NULL,};
2738
2739                 tsk_advance_rx_queue(sk);
2740                 __tipc_sendstream(new_sock, &m, 0);
2741         } else {
2742                 __skb_dequeue(&sk->sk_receive_queue);
2743                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2744                 skb_set_owner_r(buf, new_sk);
2745         }
2746         release_sock(new_sk);
2747 exit:
2748         release_sock(sk);
2749         return res;
2750 }
2751
2752 /**
2753  * tipc_shutdown - shutdown socket connection
2754  * @sock: socket structure
2755  * @how: direction to close (must be SHUT_RDWR)
2756  *
2757  * Terminates connection (if necessary), then purges socket's receive queue.
2758  *
2759  * Returns 0 on success, errno otherwise
2760  */
2761 static int tipc_shutdown(struct socket *sock, int how)
2762 {
2763         struct sock *sk = sock->sk;
2764         int res;
2765
2766         if (how != SHUT_RDWR)
2767                 return -EINVAL;
2768
2769         lock_sock(sk);
2770
2771         trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
2772         __tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
2773         sk->sk_shutdown = SEND_SHUTDOWN;
2774
2775         if (sk->sk_state == TIPC_DISCONNECTING) {
2776                 /* Discard any unreceived messages */
2777                 __skb_queue_purge(&sk->sk_receive_queue);
2778
2779                 /* Wake up anyone sleeping in poll */
2780                 sk->sk_state_change(sk);
2781                 res = 0;
2782         } else {
2783                 res = -ENOTCONN;
2784         }
2785
2786         release_sock(sk);
2787         return res;
2788 }
2789
2790 static void tipc_sk_check_probing_state(struct sock *sk,
2791                                         struct sk_buff_head *list)
2792 {
2793         struct tipc_sock *tsk = tipc_sk(sk);
2794         u32 pnode = tsk_peer_node(tsk);
2795         u32 pport = tsk_peer_port(tsk);
2796         u32 self = tsk_own_node(tsk);
2797         u32 oport = tsk->portid;
2798         struct sk_buff *skb;
2799
2800         if (tsk->probe_unacked) {
2801                 tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2802                 sk->sk_err = ECONNABORTED;
2803                 tipc_node_remove_conn(sock_net(sk), pnode, pport);
2804                 sk->sk_state_change(sk);
2805                 return;
2806         }
2807         /* Prepare new probe */
2808         skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
2809                               pnode, self, pport, oport, TIPC_OK);
2810         if (skb)
2811                 __skb_queue_tail(list, skb);
2812         tsk->probe_unacked = true;
2813         sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
2814 }
2815
2816 static void tipc_sk_retry_connect(struct sock *sk, struct sk_buff_head *list)
2817 {
2818         struct tipc_sock *tsk = tipc_sk(sk);
2819
2820         /* Try again later if dest link is congested */
2821         if (tsk->cong_link_cnt) {
2822                 sk_reset_timer(sk, &sk->sk_timer, msecs_to_jiffies(100));
2823                 return;
2824         }
2825         /* Prepare SYN for retransmit */
2826         tipc_msg_skb_clone(&sk->sk_write_queue, list);
2827 }
2828
2829 static void tipc_sk_timeout(struct timer_list *t)
2830 {
2831         struct sock *sk = from_timer(sk, t, sk_timer);
2832         struct tipc_sock *tsk = tipc_sk(sk);
2833         u32 pnode = tsk_peer_node(tsk);
2834         struct sk_buff_head list;
2835         int rc = 0;
2836
2837         __skb_queue_head_init(&list);
2838         bh_lock_sock(sk);
2839
2840         /* Try again later if socket is busy */
2841         if (sock_owned_by_user(sk)) {
2842                 sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
2843                 bh_unlock_sock(sk);
2844                 sock_put(sk);
2845                 return;
2846         }
2847
2848         if (sk->sk_state == TIPC_ESTABLISHED)
2849                 tipc_sk_check_probing_state(sk, &list);
2850         else if (sk->sk_state == TIPC_CONNECTING)
2851                 tipc_sk_retry_connect(sk, &list);
2852
2853         bh_unlock_sock(sk);
2854
2855         if (!skb_queue_empty(&list))
2856                 rc = tipc_node_xmit(sock_net(sk), &list, pnode, tsk->portid);
2857
2858         /* SYN messages may cause link congestion */
2859         if (rc == -ELINKCONG) {
2860                 tipc_dest_push(&tsk->cong_links, pnode, 0);
2861                 tsk->cong_link_cnt = 1;
2862         }
2863         sock_put(sk);
2864 }
2865
2866 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2867                            struct tipc_name_seq const *seq)
2868 {
2869         struct sock *sk = &tsk->sk;
2870         struct net *net = sock_net(sk);
2871         struct publication *publ;
2872         u32 key;
2873
2874         if (scope != TIPC_NODE_SCOPE)
2875                 scope = TIPC_CLUSTER_SCOPE;
2876
2877         if (tipc_sk_connected(sk))
2878                 return -EINVAL;
2879         key = tsk->portid + tsk->pub_count + 1;
2880         if (key == tsk->portid)
2881                 return -EADDRINUSE;
2882
2883         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2884                                     scope, tsk->portid, key);
2885         if (unlikely(!publ))
2886                 return -EINVAL;
2887
2888         list_add(&publ->binding_sock, &tsk->publications);
2889         tsk->pub_count++;
2890         tsk->published = 1;
2891         return 0;
2892 }
2893
2894 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2895                             struct tipc_name_seq const *seq)
2896 {
2897         struct net *net = sock_net(&tsk->sk);
2898         struct publication *publ;
2899         struct publication *safe;
2900         int rc = -EINVAL;
2901
2902         if (scope != TIPC_NODE_SCOPE)
2903                 scope = TIPC_CLUSTER_SCOPE;
2904
2905         list_for_each_entry_safe(publ, safe, &tsk->publications, binding_sock) {
2906                 if (seq) {
2907                         if (publ->scope != scope)
2908                                 continue;
2909                         if (publ->type != seq->type)
2910                                 continue;
2911                         if (publ->lower != seq->lower)
2912                                 continue;
2913                         if (publ->upper != seq->upper)
2914                                 break;
2915                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2916                                               publ->upper, publ->key);
2917                         rc = 0;
2918                         break;
2919                 }
2920                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2921                                       publ->upper, publ->key);
2922                 rc = 0;
2923         }
2924         if (list_empty(&tsk->publications))
2925                 tsk->published = 0;
2926         return rc;
2927 }
2928
2929 /* tipc_sk_reinit: set non-zero address in all existing sockets
2930  *                 when we go from standalone to network mode.
2931  */
2932 void tipc_sk_reinit(struct net *net)
2933 {
2934         struct tipc_net *tn = net_generic(net, tipc_net_id);
2935         struct rhashtable_iter iter;
2936         struct tipc_sock *tsk;
2937         struct tipc_msg *msg;
2938
2939         rhashtable_walk_enter(&tn->sk_rht, &iter);
2940
2941         do {
2942                 rhashtable_walk_start(&iter);
2943
2944                 while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2945                         sock_hold(&tsk->sk);
2946                         rhashtable_walk_stop(&iter);
2947                         lock_sock(&tsk->sk);
2948                         msg = &tsk->phdr;
2949                         msg_set_prevnode(msg, tipc_own_addr(net));
2950                         msg_set_orignode(msg, tipc_own_addr(net));
2951                         release_sock(&tsk->sk);
2952                         rhashtable_walk_start(&iter);
2953                         sock_put(&tsk->sk);
2954                 }
2955
2956                 rhashtable_walk_stop(&iter);
2957         } while (tsk == ERR_PTR(-EAGAIN));
2958
2959         rhashtable_walk_exit(&iter);
2960 }
2961
2962 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2963 {
2964         struct tipc_net *tn = net_generic(net, tipc_net_id);
2965         struct tipc_sock *tsk;
2966
2967         rcu_read_lock();
2968         tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params);
2969         if (tsk)
2970                 sock_hold(&tsk->sk);
2971         rcu_read_unlock();
2972
2973         return tsk;
2974 }
2975
2976 static int tipc_sk_insert(struct tipc_sock *tsk)
2977 {
2978         struct sock *sk = &tsk->sk;
2979         struct net *net = sock_net(sk);
2980         struct tipc_net *tn = net_generic(net, tipc_net_id);
2981         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2982         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2983
2984         while (remaining--) {
2985                 portid++;
2986                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2987                         portid = TIPC_MIN_PORT;
2988                 tsk->portid = portid;
2989                 sock_hold(&tsk->sk);
2990                 if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
2991                                                    tsk_rht_params))
2992                         return 0;
2993                 sock_put(&tsk->sk);
2994         }
2995
2996         return -1;
2997 }
2998
2999 static void tipc_sk_remove(struct tipc_sock *tsk)
3000 {
3001         struct sock *sk = &tsk->sk;
3002         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
3003
3004         if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
3005                 WARN_ON(refcount_read(&sk->sk_refcnt) == 1);
3006                 __sock_put(sk);
3007         }
3008 }
3009
3010 static const struct rhashtable_params tsk_rht_params = {
3011         .nelem_hint = 192,
3012         .head_offset = offsetof(struct tipc_sock, node),
3013         .key_offset = offsetof(struct tipc_sock, portid),
3014         .key_len = sizeof(u32), /* portid */
3015         .max_size = 1048576,
3016         .min_size = 256,
3017         .automatic_shrinking = true,
3018 };
3019
3020 int tipc_sk_rht_init(struct net *net)
3021 {
3022         struct tipc_net *tn = net_generic(net, tipc_net_id);
3023
3024         return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
3025 }
3026
3027 void tipc_sk_rht_destroy(struct net *net)
3028 {
3029         struct tipc_net *tn = net_generic(net, tipc_net_id);
3030
3031         /* Wait for socket readers to complete */
3032         synchronize_net();
3033
3034         rhashtable_destroy(&tn->sk_rht);
3035 }
3036
3037 static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
3038 {
3039         struct net *net = sock_net(&tsk->sk);
3040         struct tipc_group *grp = tsk->group;
3041         struct tipc_msg *hdr = &tsk->phdr;
3042         struct tipc_name_seq seq;
3043         int rc;
3044
3045         if (mreq->type < TIPC_RESERVED_TYPES)
3046                 return -EACCES;
3047         if (mreq->scope > TIPC_NODE_SCOPE)
3048                 return -EINVAL;
3049         if (grp)
3050                 return -EACCES;
3051         grp = tipc_group_create(net, tsk->portid, mreq, &tsk->group_is_open);
3052         if (!grp)
3053                 return -ENOMEM;
3054         tsk->group = grp;
3055         msg_set_lookup_scope(hdr, mreq->scope);
3056         msg_set_nametype(hdr, mreq->type);
3057         msg_set_dest_droppable(hdr, true);
3058         seq.type = mreq->type;
3059         seq.lower = mreq->instance;
3060         seq.upper = seq.lower;
3061         tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope);
3062         rc = tipc_sk_publish(tsk, mreq->scope, &seq);
3063         if (rc) {
3064                 tipc_group_delete(net, grp);
3065                 tsk->group = NULL;
3066                 return rc;
3067         }
3068         /* Eliminate any risk that a broadcast overtakes sent JOINs */
3069         tsk->mc_method.rcast = true;
3070         tsk->mc_method.mandatory = true;
3071         tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);
3072         return rc;
3073 }
3074
3075 static int tipc_sk_leave(struct tipc_sock *tsk)
3076 {
3077         struct net *net = sock_net(&tsk->sk);
3078         struct tipc_group *grp = tsk->group;
3079         struct tipc_name_seq seq;
3080         int scope;
3081
3082         if (!grp)
3083                 return -EINVAL;
3084         tipc_group_self(grp, &seq, &scope);
3085         tipc_group_delete(net, grp);
3086         tsk->group = NULL;
3087         tipc_sk_withdraw(tsk, scope, &seq);
3088         return 0;
3089 }
3090
3091 /**
3092  * tipc_setsockopt - set socket option
3093  * @sock: socket structure
3094  * @lvl: option level
3095  * @opt: option identifier
3096  * @ov: pointer to new option value
3097  * @ol: length of option value
3098  *
3099  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
3100  * (to ease compatibility).
3101  *
3102  * Returns 0 on success, errno otherwise
3103  */
3104 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
3105                            sockptr_t ov, unsigned int ol)
3106 {
3107         struct sock *sk = sock->sk;
3108         struct tipc_sock *tsk = tipc_sk(sk);
3109         struct tipc_group_req mreq;
3110         u32 value = 0;
3111         int res = 0;
3112
3113         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
3114                 return 0;
3115         if (lvl != SOL_TIPC)
3116                 return -ENOPROTOOPT;
3117
3118         switch (opt) {
3119         case TIPC_IMPORTANCE:
3120         case TIPC_SRC_DROPPABLE:
3121         case TIPC_DEST_DROPPABLE:
3122         case TIPC_CONN_TIMEOUT:
3123         case TIPC_NODELAY:
3124                 if (ol < sizeof(value))
3125                         return -EINVAL;
3126                 if (copy_from_sockptr(&value, ov, sizeof(u32)))
3127                         return -EFAULT;
3128                 break;
3129         case TIPC_GROUP_JOIN:
3130                 if (ol < sizeof(mreq))
3131                         return -EINVAL;
3132                 if (copy_from_sockptr(&mreq, ov, sizeof(mreq)))
3133                         return -EFAULT;
3134                 break;
3135         default:
3136                 if (!sockptr_is_null(ov) || ol)
3137                         return -EINVAL;
3138         }
3139
3140         lock_sock(sk);
3141
3142         switch (opt) {
3143         case TIPC_IMPORTANCE:
3144                 res = tsk_set_importance(sk, value);
3145                 break;
3146         case TIPC_SRC_DROPPABLE:
3147                 if (sock->type != SOCK_STREAM)
3148                         tsk_set_unreliable(tsk, value);
3149                 else
3150                         res = -ENOPROTOOPT;
3151                 break;
3152         case TIPC_DEST_DROPPABLE:
3153                 tsk_set_unreturnable(tsk, value);
3154                 break;
3155         case TIPC_CONN_TIMEOUT:
3156                 tipc_sk(sk)->conn_timeout = value;
3157                 break;
3158         case TIPC_MCAST_BROADCAST:
3159                 tsk->mc_method.rcast = false;
3160                 tsk->mc_method.mandatory = true;
3161                 break;
3162         case TIPC_MCAST_REPLICAST:
3163                 tsk->mc_method.rcast = true;
3164                 tsk->mc_method.mandatory = true;
3165                 break;
3166         case TIPC_GROUP_JOIN:
3167                 res = tipc_sk_join(tsk, &mreq);
3168                 break;
3169         case TIPC_GROUP_LEAVE:
3170                 res = tipc_sk_leave(tsk);
3171                 break;
3172         case TIPC_NODELAY:
3173                 tsk->nodelay = !!value;
3174                 tsk_set_nagle(tsk);
3175                 break;
3176         default:
3177                 res = -EINVAL;
3178         }
3179
3180         release_sock(sk);
3181
3182         return res;
3183 }
3184
3185 /**
3186  * tipc_getsockopt - get socket option
3187  * @sock: socket structure
3188  * @lvl: option level
3189  * @opt: option identifier
3190  * @ov: receptacle for option value
3191  * @ol: receptacle for length of option value
3192  *
3193  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
3194  * (to ease compatibility).
3195  *
3196  * Returns 0 on success, errno otherwise
3197  */
3198 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
3199                            char __user *ov, int __user *ol)
3200 {
3201         struct sock *sk = sock->sk;
3202         struct tipc_sock *tsk = tipc_sk(sk);
3203         struct tipc_name_seq seq;
3204         int len, scope;
3205         u32 value;
3206         int res;
3207
3208         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
3209                 return put_user(0, ol);
3210         if (lvl != SOL_TIPC)
3211                 return -ENOPROTOOPT;
3212         res = get_user(len, ol);
3213         if (res)
3214                 return res;
3215
3216         lock_sock(sk);
3217
3218         switch (opt) {
3219         case TIPC_IMPORTANCE:
3220                 value = tsk_importance(tsk);
3221                 break;
3222         case TIPC_SRC_DROPPABLE:
3223                 value = tsk_unreliable(tsk);
3224                 break;
3225         case TIPC_DEST_DROPPABLE:
3226                 value = tsk_unreturnable(tsk);
3227                 break;
3228         case TIPC_CONN_TIMEOUT:
3229                 value = tsk->conn_timeout;
3230                 /* no need to set "res", since already 0 at this point */
3231                 break;
3232         case TIPC_NODE_RECVQ_DEPTH:
3233                 value = 0; /* was tipc_queue_size, now obsolete */
3234                 break;
3235         case TIPC_SOCK_RECVQ_DEPTH:
3236                 value = skb_queue_len(&sk->sk_receive_queue);
3237                 break;
3238         case TIPC_SOCK_RECVQ_USED:
3239                 value = sk_rmem_alloc_get(sk);
3240                 break;
3241         case TIPC_GROUP_JOIN:
3242                 seq.type = 0;
3243                 if (tsk->group)
3244                         tipc_group_self(tsk->group, &seq, &scope);
3245                 value = seq.type;
3246                 break;
3247         default:
3248                 res = -EINVAL;
3249         }
3250
3251         release_sock(sk);
3252
3253         if (res)
3254                 return res;     /* "get" failed */
3255
3256         if (len < sizeof(value))
3257                 return -EINVAL;
3258
3259         if (copy_to_user(ov, &value, sizeof(value)))
3260                 return -EFAULT;
3261
3262         return put_user(sizeof(value), ol);
3263 }
3264
3265 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
3266 {
3267         struct net *net = sock_net(sock->sk);
3268         struct tipc_sioc_nodeid_req nr = {0};
3269         struct tipc_sioc_ln_req lnr;
3270         void __user *argp = (void __user *)arg;
3271
3272         switch (cmd) {
3273         case SIOCGETLINKNAME:
3274                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
3275                         return -EFAULT;
3276                 if (!tipc_node_get_linkname(net,
3277                                             lnr.bearer_id & 0xffff, lnr.peer,
3278                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
3279                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
3280                                 return -EFAULT;
3281                         return 0;
3282                 }
3283                 return -EADDRNOTAVAIL;
3284         case SIOCGETNODEID:
3285                 if (copy_from_user(&nr, argp, sizeof(nr)))
3286                         return -EFAULT;
3287                 if (!tipc_node_get_id(net, nr.peer, nr.node_id))
3288                         return -EADDRNOTAVAIL;
3289                 if (copy_to_user(argp, &nr, sizeof(nr)))
3290                         return -EFAULT;
3291                 return 0;
3292         default:
3293                 return -ENOIOCTLCMD;
3294         }
3295 }
3296
3297 static int tipc_socketpair(struct socket *sock1, struct socket *sock2)
3298 {
3299         struct tipc_sock *tsk2 = tipc_sk(sock2->sk);
3300         struct tipc_sock *tsk1 = tipc_sk(sock1->sk);
3301         u32 onode = tipc_own_addr(sock_net(sock1->sk));
3302
3303         tsk1->peer.family = AF_TIPC;
3304         tsk1->peer.addrtype = TIPC_ADDR_ID;
3305         tsk1->peer.scope = TIPC_NODE_SCOPE;
3306         tsk1->peer.addr.id.ref = tsk2->portid;
3307         tsk1->peer.addr.id.node = onode;
3308         tsk2->peer.family = AF_TIPC;
3309         tsk2->peer.addrtype = TIPC_ADDR_ID;
3310         tsk2->peer.scope = TIPC_NODE_SCOPE;
3311         tsk2->peer.addr.id.ref = tsk1->portid;
3312         tsk2->peer.addr.id.node = onode;
3313
3314         tipc_sk_finish_conn(tsk1, tsk2->portid, onode);
3315         tipc_sk_finish_conn(tsk2, tsk1->portid, onode);
3316         return 0;
3317 }
3318
3319 /* Protocol switches for the various types of TIPC sockets */
3320
3321 static const struct proto_ops msg_ops = {
3322         .owner          = THIS_MODULE,
3323         .family         = AF_TIPC,
3324         .release        = tipc_release,
3325         .bind           = tipc_bind,
3326         .connect        = tipc_connect,
3327         .socketpair     = tipc_socketpair,
3328         .accept         = sock_no_accept,
3329         .getname        = tipc_getname,
3330         .poll           = tipc_poll,
3331         .ioctl          = tipc_ioctl,
3332         .listen         = sock_no_listen,
3333         .shutdown       = tipc_shutdown,
3334         .setsockopt     = tipc_setsockopt,
3335         .getsockopt     = tipc_getsockopt,
3336         .sendmsg        = tipc_sendmsg,
3337         .recvmsg        = tipc_recvmsg,
3338         .mmap           = sock_no_mmap,
3339         .sendpage       = sock_no_sendpage
3340 };
3341
3342 static const struct proto_ops packet_ops = {
3343         .owner          = THIS_MODULE,
3344         .family         = AF_TIPC,
3345         .release        = tipc_release,
3346         .bind           = tipc_bind,
3347         .connect        = tipc_connect,
3348         .socketpair     = tipc_socketpair,
3349         .accept         = tipc_accept,
3350         .getname        = tipc_getname,
3351         .poll           = tipc_poll,
3352         .ioctl          = tipc_ioctl,
3353         .listen         = tipc_listen,
3354         .shutdown       = tipc_shutdown,
3355         .setsockopt     = tipc_setsockopt,
3356         .getsockopt     = tipc_getsockopt,
3357         .sendmsg        = tipc_send_packet,
3358         .recvmsg        = tipc_recvmsg,
3359         .mmap           = sock_no_mmap,
3360         .sendpage       = sock_no_sendpage
3361 };
3362
3363 static const struct proto_ops stream_ops = {
3364         .owner          = THIS_MODULE,
3365         .family         = AF_TIPC,
3366         .release        = tipc_release,
3367         .bind           = tipc_bind,
3368         .connect        = tipc_connect,
3369         .socketpair     = tipc_socketpair,
3370         .accept         = tipc_accept,
3371         .getname        = tipc_getname,
3372         .poll           = tipc_poll,
3373         .ioctl          = tipc_ioctl,
3374         .listen         = tipc_listen,
3375         .shutdown       = tipc_shutdown,
3376         .setsockopt     = tipc_setsockopt,
3377         .getsockopt     = tipc_getsockopt,
3378         .sendmsg        = tipc_sendstream,
3379         .recvmsg        = tipc_recvstream,
3380         .mmap           = sock_no_mmap,
3381         .sendpage       = sock_no_sendpage
3382 };
3383
3384 static const struct net_proto_family tipc_family_ops = {
3385         .owner          = THIS_MODULE,
3386         .family         = AF_TIPC,
3387         .create         = tipc_sk_create
3388 };
3389
3390 static struct proto tipc_proto = {
3391         .name           = "TIPC",
3392         .owner          = THIS_MODULE,
3393         .obj_size       = sizeof(struct tipc_sock),
3394         .sysctl_rmem    = sysctl_tipc_rmem
3395 };
3396
3397 /**
3398  * tipc_socket_init - initialize TIPC socket interface
3399  *
3400  * Returns 0 on success, errno otherwise
3401  */
3402 int tipc_socket_init(void)
3403 {
3404         int res;
3405
3406         res = proto_register(&tipc_proto, 1);
3407         if (res) {
3408                 pr_err("Failed to register TIPC protocol type\n");
3409                 goto out;
3410         }
3411
3412         res = sock_register(&tipc_family_ops);
3413         if (res) {
3414                 pr_err("Failed to register TIPC socket type\n");
3415                 proto_unregister(&tipc_proto);
3416                 goto out;
3417         }
3418  out:
3419         return res;
3420 }
3421
3422 /**
3423  * tipc_socket_stop - stop TIPC socket interface
3424  */
3425 void tipc_socket_stop(void)
3426 {
3427         sock_unregister(tipc_family_ops.family);
3428         proto_unregister(&tipc_proto);
3429 }
3430
3431 /* Caller should hold socket lock for the passed tipc socket. */
3432 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
3433 {
3434         u32 peer_node;
3435         u32 peer_port;
3436         struct nlattr *nest;
3437
3438         peer_node = tsk_peer_node(tsk);
3439         peer_port = tsk_peer_port(tsk);
3440
3441         nest = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_CON);
3442         if (!nest)
3443                 return -EMSGSIZE;
3444
3445         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
3446                 goto msg_full;
3447         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
3448                 goto msg_full;
3449
3450         if (tsk->conn_type != 0) {
3451                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
3452                         goto msg_full;
3453                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
3454                         goto msg_full;
3455                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
3456                         goto msg_full;
3457         }
3458         nla_nest_end(skb, nest);
3459
3460         return 0;
3461
3462 msg_full:
3463         nla_nest_cancel(skb, nest);
3464
3465         return -EMSGSIZE;
3466 }
3467
3468 static int __tipc_nl_add_sk_info(struct sk_buff *skb, struct tipc_sock
3469                           *tsk)
3470 {
3471         struct net *net = sock_net(skb->sk);
3472         struct sock *sk = &tsk->sk;
3473
3474         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid) ||
3475             nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr(net)))
3476                 return -EMSGSIZE;
3477
3478         if (tipc_sk_connected(sk)) {
3479                 if (__tipc_nl_add_sk_con(skb, tsk))
3480                         return -EMSGSIZE;
3481         } else if (!list_empty(&tsk->publications)) {
3482                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
3483                         return -EMSGSIZE;
3484         }
3485         return 0;
3486 }
3487
3488 /* Caller should hold socket lock for the passed tipc socket. */
3489 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
3490                             struct tipc_sock *tsk)
3491 {
3492         struct nlattr *attrs;
3493         void *hdr;
3494
3495         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3496                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
3497         if (!hdr)
3498                 goto msg_cancel;
3499
3500         attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
3501         if (!attrs)
3502                 goto genlmsg_cancel;
3503
3504         if (__tipc_nl_add_sk_info(skb, tsk))
3505                 goto attr_msg_cancel;
3506
3507         nla_nest_end(skb, attrs);
3508         genlmsg_end(skb, hdr);
3509
3510         return 0;
3511
3512 attr_msg_cancel:
3513         nla_nest_cancel(skb, attrs);
3514 genlmsg_cancel:
3515         genlmsg_cancel(skb, hdr);
3516 msg_cancel:
3517         return -EMSGSIZE;
3518 }
3519
3520 int tipc_nl_sk_walk(struct sk_buff *skb, struct netlink_callback *cb,
3521                     int (*skb_handler)(struct sk_buff *skb,
3522                                        struct netlink_callback *cb,
3523                                        struct tipc_sock *tsk))
3524 {
3525         struct rhashtable_iter *iter = (void *)cb->args[4];
3526         struct tipc_sock *tsk;
3527         int err;
3528
3529         rhashtable_walk_start(iter);
3530         while ((tsk = rhashtable_walk_next(iter)) != NULL) {
3531                 if (IS_ERR(tsk)) {
3532                         err = PTR_ERR(tsk);
3533                         if (err == -EAGAIN) {
3534                                 err = 0;
3535                                 continue;
3536                         }
3537                         break;
3538                 }
3539
3540                 sock_hold(&tsk->sk);
3541                 rhashtable_walk_stop(iter);
3542                 lock_sock(&tsk->sk);
3543                 err = skb_handler(skb, cb, tsk);
3544                 if (err) {
3545                         release_sock(&tsk->sk);
3546                         sock_put(&tsk->sk);
3547                         goto out;
3548                 }
3549                 release_sock(&tsk->sk);
3550                 rhashtable_walk_start(iter);
3551                 sock_put(&tsk->sk);
3552         }
3553         rhashtable_walk_stop(iter);
3554 out:
3555         return skb->len;
3556 }
3557 EXPORT_SYMBOL(tipc_nl_sk_walk);
3558
3559 int tipc_dump_start(struct netlink_callback *cb)
3560 {
3561         return __tipc_dump_start(cb, sock_net(cb->skb->sk));
3562 }
3563 EXPORT_SYMBOL(tipc_dump_start);
3564
3565 int __tipc_dump_start(struct netlink_callback *cb, struct net *net)
3566 {
3567         /* tipc_nl_name_table_dump() uses cb->args[0...3]. */
3568         struct rhashtable_iter *iter = (void *)cb->args[4];
3569         struct tipc_net *tn = tipc_net(net);
3570
3571         if (!iter) {
3572                 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
3573                 if (!iter)
3574                         return -ENOMEM;
3575
3576                 cb->args[4] = (long)iter;
3577         }
3578
3579         rhashtable_walk_enter(&tn->sk_rht, iter);
3580         return 0;
3581 }
3582
3583 int tipc_dump_done(struct netlink_callback *cb)
3584 {
3585         struct rhashtable_iter *hti = (void *)cb->args[4];
3586
3587         rhashtable_walk_exit(hti);
3588         kfree(hti);
3589         return 0;
3590 }
3591 EXPORT_SYMBOL(tipc_dump_done);
3592
3593 int tipc_sk_fill_sock_diag(struct sk_buff *skb, struct netlink_callback *cb,
3594                            struct tipc_sock *tsk, u32 sk_filter_state,
3595                            u64 (*tipc_diag_gen_cookie)(struct sock *sk))
3596 {
3597         struct sock *sk = &tsk->sk;
3598         struct nlattr *attrs;
3599         struct nlattr *stat;
3600
3601         /*filter response w.r.t sk_state*/
3602         if (!(sk_filter_state & (1 << sk->sk_state)))
3603                 return 0;
3604
3605         attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
3606         if (!attrs)
3607                 goto msg_cancel;
3608
3609         if (__tipc_nl_add_sk_info(skb, tsk))
3610                 goto attr_msg_cancel;
3611
3612         if (nla_put_u32(skb, TIPC_NLA_SOCK_TYPE, (u32)sk->sk_type) ||
3613             nla_put_u32(skb, TIPC_NLA_SOCK_TIPC_STATE, (u32)sk->sk_state) ||
3614             nla_put_u32(skb, TIPC_NLA_SOCK_INO, sock_i_ino(sk)) ||
3615             nla_put_u32(skb, TIPC_NLA_SOCK_UID,
3616                         from_kuid_munged(sk_user_ns(NETLINK_CB(cb->skb).sk),
3617                                          sock_i_uid(sk))) ||
3618             nla_put_u64_64bit(skb, TIPC_NLA_SOCK_COOKIE,
3619                               tipc_diag_gen_cookie(sk),
3620                               TIPC_NLA_SOCK_PAD))
3621                 goto attr_msg_cancel;
3622
3623         stat = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_STAT);
3624         if (!stat)
3625                 goto attr_msg_cancel;
3626
3627         if (nla_put_u32(skb, TIPC_NLA_SOCK_STAT_RCVQ,
3628                         skb_queue_len(&sk->sk_receive_queue)) ||
3629             nla_put_u32(skb, TIPC_NLA_SOCK_STAT_SENDQ,
3630                         skb_queue_len(&sk->sk_write_queue)) ||
3631             nla_put_u32(skb, TIPC_NLA_SOCK_STAT_DROP,
3632                         atomic_read(&sk->sk_drops)))
3633                 goto stat_msg_cancel;
3634
3635         if (tsk->cong_link_cnt &&
3636             nla_put_flag(skb, TIPC_NLA_SOCK_STAT_LINK_CONG))
3637                 goto stat_msg_cancel;
3638
3639         if (tsk_conn_cong(tsk) &&
3640             nla_put_flag(skb, TIPC_NLA_SOCK_STAT_CONN_CONG))
3641                 goto stat_msg_cancel;
3642
3643         nla_nest_end(skb, stat);
3644
3645         if (tsk->group)
3646                 if (tipc_group_fill_sock_diag(tsk->group, skb))
3647                         goto stat_msg_cancel;
3648
3649         nla_nest_end(skb, attrs);
3650
3651         return 0;
3652
3653 stat_msg_cancel:
3654         nla_nest_cancel(skb, stat);
3655 attr_msg_cancel:
3656         nla_nest_cancel(skb, attrs);
3657 msg_cancel:
3658         return -EMSGSIZE;
3659 }
3660 EXPORT_SYMBOL(tipc_sk_fill_sock_diag);
3661
3662 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
3663 {
3664         return tipc_nl_sk_walk(skb, cb, __tipc_nl_add_sk);
3665 }
3666
3667 /* Caller should hold socket lock for the passed tipc socket. */
3668 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
3669                                  struct netlink_callback *cb,
3670                                  struct publication *publ)
3671 {
3672         void *hdr;
3673         struct nlattr *attrs;
3674
3675         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3676                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
3677         if (!hdr)
3678                 goto msg_cancel;
3679
3680         attrs = nla_nest_start_noflag(skb, TIPC_NLA_PUBL);
3681         if (!attrs)
3682                 goto genlmsg_cancel;
3683
3684         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
3685                 goto attr_msg_cancel;
3686         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
3687                 goto attr_msg_cancel;
3688         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
3689                 goto attr_msg_cancel;
3690         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
3691                 goto attr_msg_cancel;
3692
3693         nla_nest_end(skb, attrs);
3694         genlmsg_end(skb, hdr);
3695
3696         return 0;
3697
3698 attr_msg_cancel:
3699         nla_nest_cancel(skb, attrs);
3700 genlmsg_cancel:
3701         genlmsg_cancel(skb, hdr);
3702 msg_cancel:
3703         return -EMSGSIZE;
3704 }
3705
3706 /* Caller should hold socket lock for the passed tipc socket. */
3707 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
3708                                   struct netlink_callback *cb,
3709                                   struct tipc_sock *tsk, u32 *last_publ)
3710 {
3711         int err;
3712         struct publication *p;
3713
3714         if (*last_publ) {
3715                 list_for_each_entry(p, &tsk->publications, binding_sock) {
3716                         if (p->key == *last_publ)
3717                                 break;
3718                 }
3719                 if (p->key != *last_publ) {
3720                         /* We never set seq or call nl_dump_check_consistent()
3721                          * this means that setting prev_seq here will cause the
3722                          * consistence check to fail in the netlink callback
3723                          * handler. Resulting in the last NLMSG_DONE message
3724                          * having the NLM_F_DUMP_INTR flag set.
3725                          */
3726                         cb->prev_seq = 1;
3727                         *last_publ = 0;
3728                         return -EPIPE;
3729                 }
3730         } else {
3731                 p = list_first_entry(&tsk->publications, struct publication,
3732                                      binding_sock);
3733         }
3734
3735         list_for_each_entry_from(p, &tsk->publications, binding_sock) {
3736                 err = __tipc_nl_add_sk_publ(skb, cb, p);
3737                 if (err) {
3738                         *last_publ = p->key;
3739                         return err;
3740                 }
3741         }
3742         *last_publ = 0;
3743
3744         return 0;
3745 }
3746
3747 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
3748 {
3749         int err;
3750         u32 tsk_portid = cb->args[0];
3751         u32 last_publ = cb->args[1];
3752         u32 done = cb->args[2];
3753         struct net *net = sock_net(skb->sk);
3754         struct tipc_sock *tsk;
3755
3756         if (!tsk_portid) {
3757                 struct nlattr **attrs = genl_dumpit_info(cb)->attrs;
3758                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
3759
3760                 if (!attrs[TIPC_NLA_SOCK])
3761                         return -EINVAL;
3762
3763                 err = nla_parse_nested_deprecated(sock, TIPC_NLA_SOCK_MAX,
3764                                                   attrs[TIPC_NLA_SOCK],
3765                                                   tipc_nl_sock_policy, NULL);
3766                 if (err)
3767                         return err;
3768
3769                 if (!sock[TIPC_NLA_SOCK_REF])
3770                         return -EINVAL;
3771
3772                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
3773         }
3774
3775         if (done)
3776                 return 0;
3777
3778         tsk = tipc_sk_lookup(net, tsk_portid);
3779         if (!tsk)
3780                 return -EINVAL;
3781
3782         lock_sock(&tsk->sk);
3783         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
3784         if (!err)
3785                 done = 1;
3786         release_sock(&tsk->sk);
3787         sock_put(&tsk->sk);
3788
3789         cb->args[0] = tsk_portid;
3790         cb->args[1] = last_publ;
3791         cb->args[2] = done;
3792
3793         return skb->len;
3794 }
3795
3796 /**
3797  * tipc_sk_filtering - check if a socket should be traced
3798  * @sk: the socket to be examined
3799  * @sysctl_tipc_sk_filter[]: the socket tuple for filtering,
3800  *  (portid, sock type, name type, name lower, name upper)
3801  *
3802  * Returns true if the socket meets the socket tuple data
3803  * (value 0 = 'any') or when there is no tuple set (all = 0),
3804  * otherwise false
3805  */
3806 bool tipc_sk_filtering(struct sock *sk)
3807 {
3808         struct tipc_sock *tsk;
3809         struct publication *p;
3810         u32 _port, _sktype, _type, _lower, _upper;
3811         u32 type = 0, lower = 0, upper = 0;
3812
3813         if (!sk)
3814                 return true;
3815
3816         tsk = tipc_sk(sk);
3817
3818         _port = sysctl_tipc_sk_filter[0];
3819         _sktype = sysctl_tipc_sk_filter[1];
3820         _type = sysctl_tipc_sk_filter[2];
3821         _lower = sysctl_tipc_sk_filter[3];
3822         _upper = sysctl_tipc_sk_filter[4];
3823
3824         if (!_port && !_sktype && !_type && !_lower && !_upper)
3825                 return true;
3826
3827         if (_port)
3828                 return (_port == tsk->portid);
3829
3830         if (_sktype && _sktype != sk->sk_type)
3831                 return false;
3832
3833         if (tsk->published) {
3834                 p = list_first_entry_or_null(&tsk->publications,
3835                                              struct publication, binding_sock);
3836                 if (p) {
3837                         type = p->type;
3838                         lower = p->lower;
3839                         upper = p->upper;
3840                 }
3841         }
3842
3843         if (!tipc_sk_type_connectionless(sk)) {
3844                 type = tsk->conn_type;
3845                 lower = tsk->conn_instance;
3846                 upper = tsk->conn_instance;
3847         }
3848
3849         if ((_type && _type != type) || (_lower && _lower != lower) ||
3850             (_upper && _upper != upper))
3851                 return false;
3852
3853         return true;
3854 }
3855
3856 u32 tipc_sock_get_portid(struct sock *sk)
3857 {
3858         return (sk) ? (tipc_sk(sk))->portid : 0;
3859 }
3860
3861 /**
3862  * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
3863  *                      both the rcv and backlog queues are considered
3864  * @sk: tipc sk to be checked
3865  * @skb: tipc msg to be checked
3866  *
3867  * Returns true if the socket rx queue allocation is > 90%, otherwise false
3868  */
3869
3870 bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
3871 {
3872         atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
3873         unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
3874         unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
3875
3876         return (qsize > lim * 90 / 100);
3877 }
3878
3879 /**
3880  * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
3881  *                      only the rcv queue is considered
3882  * @sk: tipc sk to be checked
3883  * @skb: tipc msg to be checked
3884  *
3885  * Returns true if the socket rx queue allocation is > 90%, otherwise false
3886  */
3887
3888 bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
3889 {
3890         unsigned int lim = rcvbuf_limit(sk, skb);
3891         unsigned int qsize = sk_rmem_alloc_get(sk);
3892
3893         return (qsize > lim * 90 / 100);
3894 }
3895
3896 /**
3897  * tipc_sk_dump - dump TIPC socket
3898  * @sk: tipc sk to be dumped
3899  * @dqueues: bitmask to decide if any socket queue to be dumped?
3900  *           - TIPC_DUMP_NONE: don't dump socket queues
3901  *           - TIPC_DUMP_SK_SNDQ: dump socket send queue
3902  *           - TIPC_DUMP_SK_RCVQ: dump socket rcv queue
3903  *           - TIPC_DUMP_SK_BKLGQ: dump socket backlog queue
3904  *           - TIPC_DUMP_ALL: dump all the socket queues above
3905  * @buf: returned buffer of dump data in format
3906  */
3907 int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf)
3908 {
3909         int i = 0;
3910         size_t sz = (dqueues) ? SK_LMAX : SK_LMIN;
3911         struct tipc_sock *tsk;
3912         struct publication *p;
3913         bool tsk_connected;
3914
3915         if (!sk) {
3916                 i += scnprintf(buf, sz, "sk data: (null)\n");
3917                 return i;
3918         }
3919
3920         tsk = tipc_sk(sk);
3921         tsk_connected = !tipc_sk_type_connectionless(sk);
3922
3923         i += scnprintf(buf, sz, "sk data: %u", sk->sk_type);
3924         i += scnprintf(buf + i, sz - i, " %d", sk->sk_state);
3925         i += scnprintf(buf + i, sz - i, " %x", tsk_own_node(tsk));
3926         i += scnprintf(buf + i, sz - i, " %u", tsk->portid);
3927         i += scnprintf(buf + i, sz - i, " | %u", tsk_connected);
3928         if (tsk_connected) {
3929                 i += scnprintf(buf + i, sz - i, " %x", tsk_peer_node(tsk));
3930                 i += scnprintf(buf + i, sz - i, " %u", tsk_peer_port(tsk));
3931                 i += scnprintf(buf + i, sz - i, " %u", tsk->conn_type);
3932                 i += scnprintf(buf + i, sz - i, " %u", tsk->conn_instance);
3933         }
3934         i += scnprintf(buf + i, sz - i, " | %u", tsk->published);
3935         if (tsk->published) {
3936                 p = list_first_entry_or_null(&tsk->publications,
3937                                              struct publication, binding_sock);
3938                 i += scnprintf(buf + i, sz - i, " %u", (p) ? p->type : 0);
3939                 i += scnprintf(buf + i, sz - i, " %u", (p) ? p->lower : 0);
3940                 i += scnprintf(buf + i, sz - i, " %u", (p) ? p->upper : 0);
3941         }
3942         i += scnprintf(buf + i, sz - i, " | %u", tsk->snd_win);
3943         i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_win);
3944         i += scnprintf(buf + i, sz - i, " %u", tsk->max_pkt);
3945         i += scnprintf(buf + i, sz - i, " %x", tsk->peer_caps);
3946         i += scnprintf(buf + i, sz - i, " %u", tsk->cong_link_cnt);
3947         i += scnprintf(buf + i, sz - i, " %u", tsk->snt_unacked);
3948         i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_unacked);
3949         i += scnprintf(buf + i, sz - i, " %u", atomic_read(&tsk->dupl_rcvcnt));
3950         i += scnprintf(buf + i, sz - i, " %u", sk->sk_shutdown);
3951         i += scnprintf(buf + i, sz - i, " | %d", sk_wmem_alloc_get(sk));
3952         i += scnprintf(buf + i, sz - i, " %d", sk->sk_sndbuf);
3953         i += scnprintf(buf + i, sz - i, " | %d", sk_rmem_alloc_get(sk));
3954         i += scnprintf(buf + i, sz - i, " %d", sk->sk_rcvbuf);
3955         i += scnprintf(buf + i, sz - i, " | %d\n", READ_ONCE(sk->sk_backlog.len));
3956
3957         if (dqueues & TIPC_DUMP_SK_SNDQ) {
3958                 i += scnprintf(buf + i, sz - i, "sk_write_queue: ");
3959                 i += tipc_list_dump(&sk->sk_write_queue, false, buf + i);
3960         }
3961
3962         if (dqueues & TIPC_DUMP_SK_RCVQ) {
3963                 i += scnprintf(buf + i, sz - i, "sk_receive_queue: ");
3964                 i += tipc_list_dump(&sk->sk_receive_queue, false, buf + i);
3965         }
3966
3967         if (dqueues & TIPC_DUMP_SK_BKLGQ) {
3968                 i += scnprintf(buf + i, sz - i, "sk_backlog:\n  head ");
3969                 i += tipc_skb_dump(sk->sk_backlog.head, false, buf + i);
3970                 if (sk->sk_backlog.tail != sk->sk_backlog.head) {
3971                         i += scnprintf(buf + i, sz - i, "  tail ");
3972                         i += tipc_skb_dump(sk->sk_backlog.tail, false,
3973                                            buf + i);
3974                 }
3975         }
3976
3977         return i;
3978 }