perf stat aggregation: Add separate node member
[linux-2.6-microblaze.git] / net / tipc / msg.c
1 /*
2  * net/tipc/msg.c: TIPC message header routines
3  *
4  * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
5  * Copyright (c) 2005, 2010-2011, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <net/sock.h>
38 #include "core.h"
39 #include "msg.h"
40 #include "addr.h"
41 #include "name_table.h"
42 #include "crypto.h"
43
44 #define MAX_FORWARD_SIZE 1024
45 #ifdef CONFIG_TIPC_CRYPTO
46 #define BUF_HEADROOM ALIGN(((LL_MAX_HEADER + 48) + EHDR_MAX_SIZE), 16)
47 #define BUF_TAILROOM (TIPC_AES_GCM_TAG_SIZE)
48 #else
49 #define BUF_HEADROOM (LL_MAX_HEADER + 48)
50 #define BUF_TAILROOM 16
51 #endif
52
53 static unsigned int align(unsigned int i)
54 {
55         return (i + 3) & ~3u;
56 }
57
58 /**
59  * tipc_buf_acquire - creates a TIPC message buffer
60  * @size: message size (including TIPC header)
61  * @gfp: memory allocation flags
62  *
63  * Return: a new buffer with data pointers set to the specified size.
64  *
65  * NOTE:
66  * Headroom is reserved to allow prepending of a data link header.
67  * There may also be unrequested tailroom present at the buffer's end.
68  */
69 struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp)
70 {
71         struct sk_buff *skb;
72 #ifdef CONFIG_TIPC_CRYPTO
73         unsigned int buf_size = (BUF_HEADROOM + size + BUF_TAILROOM + 3) & ~3u;
74 #else
75         unsigned int buf_size = (BUF_HEADROOM + size + 3) & ~3u;
76 #endif
77
78         skb = alloc_skb_fclone(buf_size, gfp);
79         if (skb) {
80                 skb_reserve(skb, BUF_HEADROOM);
81                 skb_put(skb, size);
82                 skb->next = NULL;
83         }
84         return skb;
85 }
86
87 void tipc_msg_init(u32 own_node, struct tipc_msg *m, u32 user, u32 type,
88                    u32 hsize, u32 dnode)
89 {
90         memset(m, 0, hsize);
91         msg_set_version(m);
92         msg_set_user(m, user);
93         msg_set_hdr_sz(m, hsize);
94         msg_set_size(m, hsize);
95         msg_set_prevnode(m, own_node);
96         msg_set_type(m, type);
97         if (hsize > SHORT_H_SIZE) {
98                 msg_set_orignode(m, own_node);
99                 msg_set_destnode(m, dnode);
100         }
101 }
102
103 struct sk_buff *tipc_msg_create(uint user, uint type,
104                                 uint hdr_sz, uint data_sz, u32 dnode,
105                                 u32 onode, u32 dport, u32 oport, int errcode)
106 {
107         struct tipc_msg *msg;
108         struct sk_buff *buf;
109
110         buf = tipc_buf_acquire(hdr_sz + data_sz, GFP_ATOMIC);
111         if (unlikely(!buf))
112                 return NULL;
113
114         msg = buf_msg(buf);
115         tipc_msg_init(onode, msg, user, type, hdr_sz, dnode);
116         msg_set_size(msg, hdr_sz + data_sz);
117         msg_set_origport(msg, oport);
118         msg_set_destport(msg, dport);
119         msg_set_errcode(msg, errcode);
120         if (hdr_sz > SHORT_H_SIZE) {
121                 msg_set_orignode(msg, onode);
122                 msg_set_destnode(msg, dnode);
123         }
124         return buf;
125 }
126
127 /* tipc_buf_append(): Append a buffer to the fragment list of another buffer
128  * @*headbuf: in:  NULL for first frag, otherwise value returned from prev call
129  *            out: set when successful non-complete reassembly, otherwise NULL
130  * @*buf:     in:  the buffer to append. Always defined
131  *            out: head buf after successful complete reassembly, otherwise NULL
132  * Returns 1 when reassembly complete, otherwise 0
133  */
134 int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
135 {
136         struct sk_buff *head = *headbuf;
137         struct sk_buff *frag = *buf;
138         struct sk_buff *tail = NULL;
139         struct tipc_msg *msg;
140         u32 fragid;
141         int delta;
142         bool headstolen;
143
144         if (!frag)
145                 goto err;
146
147         msg = buf_msg(frag);
148         fragid = msg_type(msg);
149         frag->next = NULL;
150         skb_pull(frag, msg_hdr_sz(msg));
151
152         if (fragid == FIRST_FRAGMENT) {
153                 if (unlikely(head))
154                         goto err;
155                 *buf = NULL;
156                 frag = skb_unshare(frag, GFP_ATOMIC);
157                 if (unlikely(!frag))
158                         goto err;
159                 head = *headbuf = frag;
160                 TIPC_SKB_CB(head)->tail = NULL;
161                 if (skb_is_nonlinear(head)) {
162                         skb_walk_frags(head, tail) {
163                                 TIPC_SKB_CB(head)->tail = tail;
164                         }
165                 } else {
166                         skb_frag_list_init(head);
167                 }
168                 return 0;
169         }
170
171         if (!head)
172                 goto err;
173
174         if (skb_try_coalesce(head, frag, &headstolen, &delta)) {
175                 kfree_skb_partial(frag, headstolen);
176         } else {
177                 tail = TIPC_SKB_CB(head)->tail;
178                 if (!skb_has_frag_list(head))
179                         skb_shinfo(head)->frag_list = frag;
180                 else
181                         tail->next = frag;
182                 head->truesize += frag->truesize;
183                 head->data_len += frag->len;
184                 head->len += frag->len;
185                 TIPC_SKB_CB(head)->tail = frag;
186         }
187
188         if (fragid == LAST_FRAGMENT) {
189                 TIPC_SKB_CB(head)->validated = 0;
190                 if (unlikely(!tipc_msg_validate(&head)))
191                         goto err;
192                 *buf = head;
193                 TIPC_SKB_CB(head)->tail = NULL;
194                 *headbuf = NULL;
195                 return 1;
196         }
197         *buf = NULL;
198         return 0;
199 err:
200         kfree_skb(*buf);
201         kfree_skb(*headbuf);
202         *buf = *headbuf = NULL;
203         return 0;
204 }
205
206 /**
207  * tipc_msg_append(): Append data to tail of an existing buffer queue
208  * @_hdr: header to be used
209  * @m: the data to be appended
210  * @mss: max allowable size of buffer
211  * @dlen: size of data to be appended
212  * @txq: queue to append to
213  *
214  * Return: the number of 1k blocks appended or errno value
215  */
216 int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen,
217                     int mss, struct sk_buff_head *txq)
218 {
219         struct sk_buff *skb;
220         int accounted, total, curr;
221         int mlen, cpy, rem = dlen;
222         struct tipc_msg *hdr;
223
224         skb = skb_peek_tail(txq);
225         accounted = skb ? msg_blocks(buf_msg(skb)) : 0;
226         total = accounted;
227
228         do {
229                 if (!skb || skb->len >= mss) {
230                         skb = tipc_buf_acquire(mss, GFP_KERNEL);
231                         if (unlikely(!skb))
232                                 return -ENOMEM;
233                         skb_orphan(skb);
234                         skb_trim(skb, MIN_H_SIZE);
235                         hdr = buf_msg(skb);
236                         skb_copy_to_linear_data(skb, _hdr, MIN_H_SIZE);
237                         msg_set_hdr_sz(hdr, MIN_H_SIZE);
238                         msg_set_size(hdr, MIN_H_SIZE);
239                         __skb_queue_tail(txq, skb);
240                         total += 1;
241                 }
242                 hdr = buf_msg(skb);
243                 curr = msg_blocks(hdr);
244                 mlen = msg_size(hdr);
245                 cpy = min_t(size_t, rem, mss - mlen);
246                 if (cpy != copy_from_iter(skb->data + mlen, cpy, &m->msg_iter))
247                         return -EFAULT;
248                 msg_set_size(hdr, mlen + cpy);
249                 skb_put(skb, cpy);
250                 rem -= cpy;
251                 total += msg_blocks(hdr) - curr;
252         } while (rem > 0);
253         return total - accounted;
254 }
255
256 /* tipc_msg_validate - validate basic format of received message
257  *
258  * This routine ensures a TIPC message has an acceptable header, and at least
259  * as much data as the header indicates it should.  The routine also ensures
260  * that the entire message header is stored in the main fragment of the message
261  * buffer, to simplify future access to message header fields.
262  *
263  * Note: Having extra info present in the message header or data areas is OK.
264  * TIPC will ignore the excess, under the assumption that it is optional info
265  * introduced by a later release of the protocol.
266  */
267 bool tipc_msg_validate(struct sk_buff **_skb)
268 {
269         struct sk_buff *skb = *_skb;
270         struct tipc_msg *hdr;
271         int msz, hsz;
272
273         /* Ensure that flow control ratio condition is satisfied */
274         if (unlikely(skb->truesize / buf_roundup_len(skb) >= 4)) {
275                 skb = skb_copy_expand(skb, BUF_HEADROOM, 0, GFP_ATOMIC);
276                 if (!skb)
277                         return false;
278                 kfree_skb(*_skb);
279                 *_skb = skb;
280         }
281
282         if (unlikely(TIPC_SKB_CB(skb)->validated))
283                 return true;
284
285         if (unlikely(!pskb_may_pull(skb, MIN_H_SIZE)))
286                 return false;
287
288         hsz = msg_hdr_sz(buf_msg(skb));
289         if (unlikely(hsz < MIN_H_SIZE) || (hsz > MAX_H_SIZE))
290                 return false;
291         if (unlikely(!pskb_may_pull(skb, hsz)))
292                 return false;
293
294         hdr = buf_msg(skb);
295         if (unlikely(msg_version(hdr) != TIPC_VERSION))
296                 return false;
297
298         msz = msg_size(hdr);
299         if (unlikely(msz < hsz))
300                 return false;
301         if (unlikely((msz - hsz) > TIPC_MAX_USER_MSG_SIZE))
302                 return false;
303         if (unlikely(skb->len < msz))
304                 return false;
305
306         TIPC_SKB_CB(skb)->validated = 1;
307         return true;
308 }
309
310 /**
311  * tipc_msg_fragment - build a fragment skb list for TIPC message
312  *
313  * @skb: TIPC message skb
314  * @hdr: internal msg header to be put on the top of the fragments
315  * @pktmax: max size of a fragment incl. the header
316  * @frags: returned fragment skb list
317  *
318  * Return: 0 if the fragmentation is successful, otherwise: -EINVAL
319  * or -ENOMEM
320  */
321 int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr,
322                       int pktmax, struct sk_buff_head *frags)
323 {
324         int pktno, nof_fragms, dsz, dmax, eat;
325         struct tipc_msg *_hdr;
326         struct sk_buff *_skb;
327         u8 *data;
328
329         /* Non-linear buffer? */
330         if (skb_linearize(skb))
331                 return -ENOMEM;
332
333         data = (u8 *)skb->data;
334         dsz = msg_size(buf_msg(skb));
335         dmax = pktmax - INT_H_SIZE;
336         if (dsz <= dmax || !dmax)
337                 return -EINVAL;
338
339         nof_fragms = dsz / dmax + 1;
340         for (pktno = 1; pktno <= nof_fragms; pktno++) {
341                 if (pktno < nof_fragms)
342                         eat = dmax;
343                 else
344                         eat = dsz % dmax;
345                 /* Allocate a new fragment */
346                 _skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC);
347                 if (!_skb)
348                         goto error;
349                 skb_orphan(_skb);
350                 __skb_queue_tail(frags, _skb);
351                 /* Copy header & data to the fragment */
352                 skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE);
353                 skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat);
354                 data += eat;
355                 /* Update the fragment's header */
356                 _hdr = buf_msg(_skb);
357                 msg_set_fragm_no(_hdr, pktno);
358                 msg_set_nof_fragms(_hdr, nof_fragms);
359                 msg_set_size(_hdr, INT_H_SIZE + eat);
360         }
361         return 0;
362
363 error:
364         __skb_queue_purge(frags);
365         __skb_queue_head_init(frags);
366         return -ENOMEM;
367 }
368
369 /**
370  * tipc_msg_build - create buffer chain containing specified header and data
371  * @mhdr: Message header, to be prepended to data
372  * @m: User message
373  * @offset: buffer offset for fragmented messages (FIXME)
374  * @dsz: Total length of user data
375  * @pktmax: Max packet size that can be used
376  * @list: Buffer or chain of buffers to be returned to caller
377  *
378  * Note that the recursive call we are making here is safe, since it can
379  * logically go only one further level down.
380  *
381  * Return: message data size or errno: -ENOMEM, -EFAULT
382  */
383 int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
384                    int dsz, int pktmax, struct sk_buff_head *list)
385 {
386         int mhsz = msg_hdr_sz(mhdr);
387         struct tipc_msg pkthdr;
388         int msz = mhsz + dsz;
389         int pktrem = pktmax;
390         struct sk_buff *skb;
391         int drem = dsz;
392         int pktno = 1;
393         char *pktpos;
394         int pktsz;
395         int rc;
396
397         msg_set_size(mhdr, msz);
398
399         /* No fragmentation needed? */
400         if (likely(msz <= pktmax)) {
401                 skb = tipc_buf_acquire(msz, GFP_KERNEL);
402
403                 /* Fall back to smaller MTU if node local message */
404                 if (unlikely(!skb)) {
405                         if (pktmax != MAX_MSG_SIZE)
406                                 return -ENOMEM;
407                         rc = tipc_msg_build(mhdr, m, offset, dsz, FB_MTU, list);
408                         if (rc != dsz)
409                                 return rc;
410                         if (tipc_msg_assemble(list))
411                                 return dsz;
412                         return -ENOMEM;
413                 }
414                 skb_orphan(skb);
415                 __skb_queue_tail(list, skb);
416                 skb_copy_to_linear_data(skb, mhdr, mhsz);
417                 pktpos = skb->data + mhsz;
418                 if (copy_from_iter_full(pktpos, dsz, &m->msg_iter))
419                         return dsz;
420                 rc = -EFAULT;
421                 goto error;
422         }
423
424         /* Prepare reusable fragment header */
425         tipc_msg_init(msg_prevnode(mhdr), &pkthdr, MSG_FRAGMENTER,
426                       FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));
427         msg_set_size(&pkthdr, pktmax);
428         msg_set_fragm_no(&pkthdr, pktno);
429         msg_set_importance(&pkthdr, msg_importance(mhdr));
430
431         /* Prepare first fragment */
432         skb = tipc_buf_acquire(pktmax, GFP_KERNEL);
433         if (!skb)
434                 return -ENOMEM;
435         skb_orphan(skb);
436         __skb_queue_tail(list, skb);
437         pktpos = skb->data;
438         skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
439         pktpos += INT_H_SIZE;
440         pktrem -= INT_H_SIZE;
441         skb_copy_to_linear_data_offset(skb, INT_H_SIZE, mhdr, mhsz);
442         pktpos += mhsz;
443         pktrem -= mhsz;
444
445         do {
446                 if (drem < pktrem)
447                         pktrem = drem;
448
449                 if (!copy_from_iter_full(pktpos, pktrem, &m->msg_iter)) {
450                         rc = -EFAULT;
451                         goto error;
452                 }
453                 drem -= pktrem;
454
455                 if (!drem)
456                         break;
457
458                 /* Prepare new fragment: */
459                 if (drem < (pktmax - INT_H_SIZE))
460                         pktsz = drem + INT_H_SIZE;
461                 else
462                         pktsz = pktmax;
463                 skb = tipc_buf_acquire(pktsz, GFP_KERNEL);
464                 if (!skb) {
465                         rc = -ENOMEM;
466                         goto error;
467                 }
468                 skb_orphan(skb);
469                 __skb_queue_tail(list, skb);
470                 msg_set_type(&pkthdr, FRAGMENT);
471                 msg_set_size(&pkthdr, pktsz);
472                 msg_set_fragm_no(&pkthdr, ++pktno);
473                 skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
474                 pktpos = skb->data + INT_H_SIZE;
475                 pktrem = pktsz - INT_H_SIZE;
476
477         } while (1);
478         msg_set_type(buf_msg(skb), LAST_FRAGMENT);
479         return dsz;
480 error:
481         __skb_queue_purge(list);
482         __skb_queue_head_init(list);
483         return rc;
484 }
485
486 /**
487  * tipc_msg_bundle - Append contents of a buffer to tail of an existing one
488  * @bskb: the bundle buffer to append to
489  * @msg: message to be appended
490  * @max: max allowable size for the bundle buffer
491  *
492  * Return: "true" if bundling has been performed, otherwise "false"
493  */
494 static bool tipc_msg_bundle(struct sk_buff *bskb, struct tipc_msg *msg,
495                             u32 max)
496 {
497         struct tipc_msg *bmsg = buf_msg(bskb);
498         u32 msz, bsz, offset, pad;
499
500         msz = msg_size(msg);
501         bsz = msg_size(bmsg);
502         offset = align(bsz);
503         pad = offset - bsz;
504
505         if (unlikely(skb_tailroom(bskb) < (pad + msz)))
506                 return false;
507         if (unlikely(max < (offset + msz)))
508                 return false;
509
510         skb_put(bskb, pad + msz);
511         skb_copy_to_linear_data_offset(bskb, offset, msg, msz);
512         msg_set_size(bmsg, offset + msz);
513         msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
514         return true;
515 }
516
517 /**
518  * tipc_msg_try_bundle - Try to bundle a new message to the last one
519  * @tskb: the last/target message to which the new one will be appended
520  * @skb: the new message skb pointer
521  * @mss: max message size (header inclusive)
522  * @dnode: destination node for the message
523  * @new_bundle: if this call made a new bundle or not
524  *
525  * Return: "true" if the new message skb is potential for bundling this time or
526  * later, in the case a bundling has been done this time, the skb is consumed
527  * (the skb pointer = NULL).
528  * Otherwise, "false" if the skb cannot be bundled at all.
529  */
530 bool tipc_msg_try_bundle(struct sk_buff *tskb, struct sk_buff **skb, u32 mss,
531                          u32 dnode, bool *new_bundle)
532 {
533         struct tipc_msg *msg, *inner, *outer;
534         u32 tsz;
535
536         /* First, check if the new buffer is suitable for bundling */
537         msg = buf_msg(*skb);
538         if (msg_user(msg) == MSG_FRAGMENTER)
539                 return false;
540         if (msg_user(msg) == TUNNEL_PROTOCOL)
541                 return false;
542         if (msg_user(msg) == BCAST_PROTOCOL)
543                 return false;
544         if (mss <= INT_H_SIZE + msg_size(msg))
545                 return false;
546
547         /* Ok, but the last/target buffer can be empty? */
548         if (unlikely(!tskb))
549                 return true;
550
551         /* Is it a bundle already? Try to bundle the new message to it */
552         if (msg_user(buf_msg(tskb)) == MSG_BUNDLER) {
553                 *new_bundle = false;
554                 goto bundle;
555         }
556
557         /* Make a new bundle of the two messages if possible */
558         tsz = msg_size(buf_msg(tskb));
559         if (unlikely(mss < align(INT_H_SIZE + tsz) + msg_size(msg)))
560                 return true;
561         if (unlikely(pskb_expand_head(tskb, INT_H_SIZE, mss - tsz - INT_H_SIZE,
562                                       GFP_ATOMIC)))
563                 return true;
564         inner = buf_msg(tskb);
565         skb_push(tskb, INT_H_SIZE);
566         outer = buf_msg(tskb);
567         tipc_msg_init(msg_prevnode(inner), outer, MSG_BUNDLER, 0, INT_H_SIZE,
568                       dnode);
569         msg_set_importance(outer, msg_importance(inner));
570         msg_set_size(outer, INT_H_SIZE + tsz);
571         msg_set_msgcnt(outer, 1);
572         *new_bundle = true;
573
574 bundle:
575         if (likely(tipc_msg_bundle(tskb, msg, mss))) {
576                 consume_skb(*skb);
577                 *skb = NULL;
578         }
579         return true;
580 }
581
582 /**
583  *  tipc_msg_extract(): extract bundled inner packet from buffer
584  *  @skb: buffer to be extracted from.
585  *  @iskb: extracted inner buffer, to be returned
586  *  @pos: position in outer message of msg to be extracted.
587  *  Returns position of next msg.
588  *  Consumes outer buffer when last packet extracted
589  *  Return: true when there is an extracted buffer, otherwise false
590  */
591 bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
592 {
593         struct tipc_msg *hdr, *ihdr;
594         int imsz;
595
596         *iskb = NULL;
597         if (unlikely(skb_linearize(skb)))
598                 goto none;
599
600         hdr = buf_msg(skb);
601         if (unlikely(*pos > (msg_data_sz(hdr) - MIN_H_SIZE)))
602                 goto none;
603
604         ihdr = (struct tipc_msg *)(msg_data(hdr) + *pos);
605         imsz = msg_size(ihdr);
606
607         if ((*pos + imsz) > msg_data_sz(hdr))
608                 goto none;
609
610         *iskb = tipc_buf_acquire(imsz, GFP_ATOMIC);
611         if (!*iskb)
612                 goto none;
613
614         skb_copy_to_linear_data(*iskb, ihdr, imsz);
615         if (unlikely(!tipc_msg_validate(iskb)))
616                 goto none;
617
618         *pos += align(imsz);
619         return true;
620 none:
621         kfree_skb(skb);
622         kfree_skb(*iskb);
623         *iskb = NULL;
624         return false;
625 }
626
627 /**
628  * tipc_msg_reverse(): swap source and destination addresses and add error code
629  * @own_node: originating node id for reversed message
630  * @skb:  buffer containing message to be reversed; will be consumed
631  * @err:  error code to be set in message, if any
632  * Replaces consumed buffer with new one when successful
633  * Return: true if success, otherwise false
634  */
635 bool tipc_msg_reverse(u32 own_node,  struct sk_buff **skb, int err)
636 {
637         struct sk_buff *_skb = *skb;
638         struct tipc_msg *_hdr, *hdr;
639         int hlen, dlen;
640
641         if (skb_linearize(_skb))
642                 goto exit;
643         _hdr = buf_msg(_skb);
644         dlen = min_t(uint, msg_data_sz(_hdr), MAX_FORWARD_SIZE);
645         hlen = msg_hdr_sz(_hdr);
646
647         if (msg_dest_droppable(_hdr))
648                 goto exit;
649         if (msg_errcode(_hdr))
650                 goto exit;
651
652         /* Never return SHORT header */
653         if (hlen == SHORT_H_SIZE)
654                 hlen = BASIC_H_SIZE;
655
656         /* Don't return data along with SYN+, - sender has a clone */
657         if (msg_is_syn(_hdr) && err == TIPC_ERR_OVERLOAD)
658                 dlen = 0;
659
660         /* Allocate new buffer to return */
661         *skb = tipc_buf_acquire(hlen + dlen, GFP_ATOMIC);
662         if (!*skb)
663                 goto exit;
664         memcpy((*skb)->data, _skb->data, msg_hdr_sz(_hdr));
665         memcpy((*skb)->data + hlen, msg_data(_hdr), dlen);
666
667         /* Build reverse header in new buffer */
668         hdr = buf_msg(*skb);
669         msg_set_hdr_sz(hdr, hlen);
670         msg_set_errcode(hdr, err);
671         msg_set_non_seq(hdr, 0);
672         msg_set_origport(hdr, msg_destport(_hdr));
673         msg_set_destport(hdr, msg_origport(_hdr));
674         msg_set_destnode(hdr, msg_prevnode(_hdr));
675         msg_set_prevnode(hdr, own_node);
676         msg_set_orignode(hdr, own_node);
677         msg_set_size(hdr, hlen + dlen);
678         skb_orphan(_skb);
679         kfree_skb(_skb);
680         return true;
681 exit:
682         kfree_skb(_skb);
683         *skb = NULL;
684         return false;
685 }
686
687 bool tipc_msg_skb_clone(struct sk_buff_head *msg, struct sk_buff_head *cpy)
688 {
689         struct sk_buff *skb, *_skb;
690
691         skb_queue_walk(msg, skb) {
692                 _skb = skb_clone(skb, GFP_ATOMIC);
693                 if (!_skb) {
694                         __skb_queue_purge(cpy);
695                         pr_err_ratelimited("Failed to clone buffer chain\n");
696                         return false;
697                 }
698                 __skb_queue_tail(cpy, _skb);
699         }
700         return true;
701 }
702
703 /**
704  * tipc_msg_lookup_dest(): try to find new destination for named message
705  * @net: pointer to associated network namespace
706  * @skb: the buffer containing the message.
707  * @err: error code to be used by caller if lookup fails
708  * Does not consume buffer
709  * Return: true if a destination is found, false otherwise
710  */
711 bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
712 {
713         struct tipc_msg *msg = buf_msg(skb);
714         u32 dport, dnode;
715         u32 onode = tipc_own_addr(net);
716
717         if (!msg_isdata(msg))
718                 return false;
719         if (!msg_named(msg))
720                 return false;
721         if (msg_errcode(msg))
722                 return false;
723         *err = TIPC_ERR_NO_NAME;
724         if (skb_linearize(skb))
725                 return false;
726         msg = buf_msg(skb);
727         if (msg_reroute_cnt(msg))
728                 return false;
729         dnode = tipc_scope2node(net, msg_lookup_scope(msg));
730         dport = tipc_nametbl_translate(net, msg_nametype(msg),
731                                        msg_nameinst(msg), &dnode);
732         if (!dport)
733                 return false;
734         msg_incr_reroute_cnt(msg);
735         if (dnode != onode)
736                 msg_set_prevnode(msg, onode);
737         msg_set_destnode(msg, dnode);
738         msg_set_destport(msg, dport);
739         *err = TIPC_OK;
740
741         return true;
742 }
743
744 /* tipc_msg_assemble() - assemble chain of fragments into one message
745  */
746 bool tipc_msg_assemble(struct sk_buff_head *list)
747 {
748         struct sk_buff *skb, *tmp = NULL;
749
750         if (skb_queue_len(list) == 1)
751                 return true;
752
753         while ((skb = __skb_dequeue(list))) {
754                 skb->next = NULL;
755                 if (tipc_buf_append(&tmp, &skb)) {
756                         __skb_queue_tail(list, skb);
757                         return true;
758                 }
759                 if (!tmp)
760                         break;
761         }
762         __skb_queue_purge(list);
763         __skb_queue_head_init(list);
764         pr_warn("Failed do assemble buffer\n");
765         return false;
766 }
767
768 /* tipc_msg_reassemble() - clone a buffer chain of fragments and
769  *                         reassemble the clones into one message
770  */
771 bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq)
772 {
773         struct sk_buff *skb, *_skb;
774         struct sk_buff *frag = NULL;
775         struct sk_buff *head = NULL;
776         int hdr_len;
777
778         /* Copy header if single buffer */
779         if (skb_queue_len(list) == 1) {
780                 skb = skb_peek(list);
781                 hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
782                 _skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC);
783                 if (!_skb)
784                         return false;
785                 __skb_queue_tail(rcvq, _skb);
786                 return true;
787         }
788
789         /* Clone all fragments and reassemble */
790         skb_queue_walk(list, skb) {
791                 frag = skb_clone(skb, GFP_ATOMIC);
792                 if (!frag)
793                         goto error;
794                 frag->next = NULL;
795                 if (tipc_buf_append(&head, &frag))
796                         break;
797                 if (!head)
798                         goto error;
799         }
800         __skb_queue_tail(rcvq, frag);
801         return true;
802 error:
803         pr_warn("Failed do clone local mcast rcv buffer\n");
804         kfree_skb(head);
805         return false;
806 }
807
808 bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
809                         struct sk_buff_head *cpy)
810 {
811         struct sk_buff *skb, *_skb;
812
813         skb_queue_walk(msg, skb) {
814                 _skb = pskb_copy(skb, GFP_ATOMIC);
815                 if (!_skb) {
816                         __skb_queue_purge(cpy);
817                         return false;
818                 }
819                 msg_set_destnode(buf_msg(_skb), dst);
820                 __skb_queue_tail(cpy, _skb);
821         }
822         return true;
823 }
824
825 /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
826  * @list: list to be appended to
827  * @seqno: sequence number of buffer to add
828  * @skb: buffer to add
829  */
830 bool __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
831                              struct sk_buff *skb)
832 {
833         struct sk_buff *_skb, *tmp;
834
835         if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) {
836                 __skb_queue_head(list, skb);
837                 return true;
838         }
839
840         if (more(seqno, buf_seqno(skb_peek_tail(list)))) {
841                 __skb_queue_tail(list, skb);
842                 return true;
843         }
844
845         skb_queue_walk_safe(list, _skb, tmp) {
846                 if (more(seqno, buf_seqno(_skb)))
847                         continue;
848                 if (seqno == buf_seqno(_skb))
849                         break;
850                 __skb_queue_before(list, _skb, skb);
851                 return true;
852         }
853         kfree_skb(skb);
854         return false;
855 }
856
857 void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
858                      struct sk_buff_head *xmitq)
859 {
860         if (tipc_msg_reverse(tipc_own_addr(net), &skb, err))
861                 __skb_queue_tail(xmitq, skb);
862 }