MIPS: Loongson: Reduce possible loop times and add log in do_thermal_timer()
[linux-2.6-microblaze.git] / net / rxrpc / recvmsg.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25         struct rxrpc_sock *rx;
26         struct sock *sk;
27
28         _enter("%d", call->debug_id);
29
30         if (!list_empty(&call->recvmsg_link))
31                 return;
32
33         rcu_read_lock();
34
35         rx = rcu_dereference(call->socket);
36         sk = &rx->sk;
37         if (rx && sk->sk_state < RXRPC_CLOSE) {
38                 if (call->notify_rx) {
39                         spin_lock_bh(&call->notify_lock);
40                         call->notify_rx(sk, call, call->user_call_ID);
41                         spin_unlock_bh(&call->notify_lock);
42                 } else {
43                         write_lock_bh(&rx->recvmsg_lock);
44                         if (list_empty(&call->recvmsg_link)) {
45                                 rxrpc_get_call(call, rxrpc_call_got);
46                                 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
47                         }
48                         write_unlock_bh(&rx->recvmsg_lock);
49
50                         if (!sock_flag(sk, SOCK_DEAD)) {
51                                 _debug("call %ps", sk->sk_data_ready);
52                                 sk->sk_data_ready(sk);
53                         }
54                 }
55         }
56
57         rcu_read_unlock();
58         _leave("");
59 }
60
61 /*
62  * Transition a call to the complete state.
63  */
64 bool __rxrpc_set_call_completion(struct rxrpc_call *call,
65                                  enum rxrpc_call_completion compl,
66                                  u32 abort_code,
67                                  int error)
68 {
69         if (call->state < RXRPC_CALL_COMPLETE) {
70                 call->abort_code = abort_code;
71                 call->error = error;
72                 call->completion = compl,
73                 call->state = RXRPC_CALL_COMPLETE;
74                 trace_rxrpc_call_complete(call);
75                 wake_up(&call->waitq);
76                 rxrpc_notify_socket(call);
77                 return true;
78         }
79         return false;
80 }
81
82 bool rxrpc_set_call_completion(struct rxrpc_call *call,
83                                enum rxrpc_call_completion compl,
84                                u32 abort_code,
85                                int error)
86 {
87         bool ret = false;
88
89         if (call->state < RXRPC_CALL_COMPLETE) {
90                 write_lock_bh(&call->state_lock);
91                 ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
92                 write_unlock_bh(&call->state_lock);
93         }
94         return ret;
95 }
96
97 /*
98  * Record that a call successfully completed.
99  */
100 bool __rxrpc_call_completed(struct rxrpc_call *call)
101 {
102         return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
103 }
104
105 bool rxrpc_call_completed(struct rxrpc_call *call)
106 {
107         bool ret = false;
108
109         if (call->state < RXRPC_CALL_COMPLETE) {
110                 write_lock_bh(&call->state_lock);
111                 ret = __rxrpc_call_completed(call);
112                 write_unlock_bh(&call->state_lock);
113         }
114         return ret;
115 }
116
117 /*
118  * Record that a call is locally aborted.
119  */
120 bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call,
121                         rxrpc_seq_t seq, u32 abort_code, int error)
122 {
123         trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq,
124                           abort_code, error);
125         return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
126                                            abort_code, error);
127 }
128
129 bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
130                       rxrpc_seq_t seq, u32 abort_code, int error)
131 {
132         bool ret;
133
134         write_lock_bh(&call->state_lock);
135         ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
136         write_unlock_bh(&call->state_lock);
137         return ret;
138 }
139
140 /*
141  * Pass a call terminating message to userspace.
142  */
143 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
144 {
145         u32 tmp = 0;
146         int ret;
147
148         switch (call->completion) {
149         case RXRPC_CALL_SUCCEEDED:
150                 ret = 0;
151                 if (rxrpc_is_service_call(call))
152                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
153                 break;
154         case RXRPC_CALL_REMOTELY_ABORTED:
155                 tmp = call->abort_code;
156                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
157                 break;
158         case RXRPC_CALL_LOCALLY_ABORTED:
159                 tmp = call->abort_code;
160                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
161                 break;
162         case RXRPC_CALL_NETWORK_ERROR:
163                 tmp = -call->error;
164                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
165                 break;
166         case RXRPC_CALL_LOCAL_ERROR:
167                 tmp = -call->error;
168                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
169                 break;
170         default:
171                 pr_err("Invalid terminal call state %u\n", call->state);
172                 BUG();
173                 break;
174         }
175
176         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
177                             call->rx_pkt_offset, call->rx_pkt_len, ret);
178         return ret;
179 }
180
181 /*
182  * Pass back notification of a new call.  The call is added to the
183  * to-be-accepted list.  This means that the next call to be accepted might not
184  * be the last call seen awaiting acceptance, but unless we leave this on the
185  * front of the queue and block all other messages until someone gives us a
186  * user_ID for it, there's not a lot we can do.
187  */
188 static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
189                                   struct rxrpc_call *call,
190                                   struct msghdr *msg, int flags)
191 {
192         int tmp = 0, ret;
193
194         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
195
196         if (ret == 0 && !(flags & MSG_PEEK)) {
197                 _debug("to be accepted");
198                 write_lock_bh(&rx->recvmsg_lock);
199                 list_del_init(&call->recvmsg_link);
200                 write_unlock_bh(&rx->recvmsg_lock);
201
202                 rxrpc_get_call(call, rxrpc_call_got);
203                 write_lock(&rx->call_lock);
204                 list_add_tail(&call->accept_link, &rx->to_be_accepted);
205                 write_unlock(&rx->call_lock);
206         }
207
208         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_to_be_accepted, 1, 0, 0, ret);
209         return ret;
210 }
211
212 /*
213  * End the packet reception phase.
214  */
215 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
216 {
217         _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
218
219         trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
220         ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
221
222         if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
223                 rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true,
224                                   rxrpc_propose_ack_terminal_ack);
225                 //rxrpc_send_ack_packet(call, false, NULL);
226         }
227
228         write_lock_bh(&call->state_lock);
229
230         switch (call->state) {
231         case RXRPC_CALL_CLIENT_RECV_REPLY:
232                 __rxrpc_call_completed(call);
233                 write_unlock_bh(&call->state_lock);
234                 break;
235
236         case RXRPC_CALL_SERVER_RECV_REQUEST:
237                 call->tx_phase = true;
238                 call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
239                 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
240                 write_unlock_bh(&call->state_lock);
241                 rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true,
242                                   rxrpc_propose_ack_processing_op);
243                 break;
244         default:
245                 write_unlock_bh(&call->state_lock);
246                 break;
247         }
248 }
249
250 /*
251  * Discard a packet we've used up and advance the Rx window by one.
252  */
253 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
254 {
255         struct rxrpc_skb_priv *sp;
256         struct sk_buff *skb;
257         rxrpc_serial_t serial;
258         rxrpc_seq_t hard_ack, top;
259         bool last = false;
260         u8 subpacket;
261         int ix;
262
263         _enter("%d", call->debug_id);
264
265         hard_ack = call->rx_hard_ack;
266         top = smp_load_acquire(&call->rx_top);
267         ASSERT(before(hard_ack, top));
268
269         hard_ack++;
270         ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
271         skb = call->rxtx_buffer[ix];
272         rxrpc_see_skb(skb, rxrpc_skb_rotated);
273         sp = rxrpc_skb(skb);
274
275         subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
276         serial = sp->hdr.serial + subpacket;
277
278         if (subpacket == sp->nr_subpackets - 1 &&
279             sp->rx_flags & RXRPC_SKB_INCL_LAST)
280                 last = true;
281
282         call->rxtx_buffer[ix] = NULL;
283         call->rxtx_annotations[ix] = 0;
284         /* Barrier against rxrpc_input_data(). */
285         smp_store_release(&call->rx_hard_ack, hard_ack);
286
287         rxrpc_free_skb(skb, rxrpc_skb_freed);
288
289         trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
290         if (last) {
291                 rxrpc_end_rx_phase(call, serial);
292         } else {
293                 /* Check to see if there's an ACK that needs sending. */
294                 if (after_eq(hard_ack, call->ackr_consumed + 2) ||
295                     after_eq(top, call->ackr_seen + 2) ||
296                     (hard_ack == top && after(hard_ack, call->ackr_consumed)))
297                         rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial,
298                                           true, true,
299                                           rxrpc_propose_ack_rotate_rx);
300                 if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
301                         rxrpc_send_ack_packet(call, false, NULL);
302         }
303 }
304
305 /*
306  * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
307  * padding, but if this is the case, the packet length will be resident in the
308  * socket buffer.  Note that we can't modify the master skb info as the skb may
309  * be the home to multiple subpackets.
310  */
311 static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
312                                u8 annotation,
313                                unsigned int offset, unsigned int len)
314 {
315         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
316         rxrpc_seq_t seq = sp->hdr.seq;
317         u16 cksum = sp->hdr.cksum;
318         u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
319
320         _enter("");
321
322         /* For all but the head jumbo subpacket, the security checksum is in a
323          * jumbo header immediately prior to the data.
324          */
325         if (subpacket > 0) {
326                 __be16 tmp;
327                 if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
328                         BUG();
329                 cksum = ntohs(tmp);
330                 seq += subpacket;
331         }
332
333         return call->security->verify_packet(call, skb, offset, len,
334                                              seq, cksum);
335 }
336
337 /*
338  * Locate the data within a packet.  This is complicated by:
339  *
340  * (1) An skb may contain a jumbo packet - so we have to find the appropriate
341  *     subpacket.
342  *
343  * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
344  *     contains an extra header which includes the true length of the data,
345  *     excluding any encrypted padding.
346  */
347 static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
348                              u8 *_annotation,
349                              unsigned int *_offset, unsigned int *_len,
350                              bool *_last)
351 {
352         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
353         unsigned int offset = sizeof(struct rxrpc_wire_header);
354         unsigned int len;
355         bool last = false;
356         int ret;
357         u8 annotation = *_annotation;
358         u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
359
360         /* Locate the subpacket */
361         offset += subpacket * RXRPC_JUMBO_SUBPKTLEN;
362         len = skb->len - offset;
363         if (subpacket < sp->nr_subpackets - 1)
364                 len = RXRPC_JUMBO_DATALEN;
365         else if (sp->rx_flags & RXRPC_SKB_INCL_LAST)
366                 last = true;
367
368         if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
369                 ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
370                 if (ret < 0)
371                         return ret;
372                 *_annotation |= RXRPC_RX_ANNO_VERIFIED;
373         }
374
375         *_offset = offset;
376         *_len = len;
377         *_last = last;
378         call->security->locate_data(call, skb, _offset, _len);
379         return 0;
380 }
381
382 /*
383  * Deliver messages to a call.  This keeps processing packets until the buffer
384  * is filled and we find either more DATA (returns 0) or the end of the DATA
385  * (returns 1).  If more packets are required, it returns -EAGAIN.
386  */
387 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
388                               struct msghdr *msg, struct iov_iter *iter,
389                               size_t len, int flags, size_t *_offset)
390 {
391         struct rxrpc_skb_priv *sp;
392         struct sk_buff *skb;
393         rxrpc_serial_t serial;
394         rxrpc_seq_t hard_ack, top, seq;
395         size_t remain;
396         bool rx_pkt_last;
397         unsigned int rx_pkt_offset, rx_pkt_len;
398         int ix, copy, ret = -EAGAIN, ret2;
399
400         if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
401             call->ackr_reason)
402                 rxrpc_send_ack_packet(call, false, NULL);
403
404         rx_pkt_offset = call->rx_pkt_offset;
405         rx_pkt_len = call->rx_pkt_len;
406         rx_pkt_last = call->rx_pkt_last;
407
408         if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
409                 seq = call->rx_hard_ack;
410                 ret = 1;
411                 goto done;
412         }
413
414         /* Barriers against rxrpc_input_data(). */
415         hard_ack = call->rx_hard_ack;
416         seq = hard_ack + 1;
417
418         while (top = smp_load_acquire(&call->rx_top),
419                before_eq(seq, top)
420                ) {
421                 ix = seq & RXRPC_RXTX_BUFF_MASK;
422                 skb = call->rxtx_buffer[ix];
423                 if (!skb) {
424                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
425                                             rx_pkt_offset, rx_pkt_len, 0);
426                         break;
427                 }
428                 smp_rmb();
429                 rxrpc_see_skb(skb, rxrpc_skb_seen);
430                 sp = rxrpc_skb(skb);
431
432                 if (!(flags & MSG_PEEK)) {
433                         serial = sp->hdr.serial;
434                         serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
435                         trace_rxrpc_receive(call, rxrpc_receive_front,
436                                             serial, seq);
437                 }
438
439                 if (msg)
440                         sock_recv_timestamp(msg, sock->sk, skb);
441
442                 if (rx_pkt_offset == 0) {
443                         ret2 = rxrpc_locate_data(call, skb,
444                                                  &call->rxtx_annotations[ix],
445                                                  &rx_pkt_offset, &rx_pkt_len,
446                                                  &rx_pkt_last);
447                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
448                                             rx_pkt_offset, rx_pkt_len, ret2);
449                         if (ret2 < 0) {
450                                 ret = ret2;
451                                 goto out;
452                         }
453                 } else {
454                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
455                                             rx_pkt_offset, rx_pkt_len, 0);
456                 }
457
458                 /* We have to handle short, empty and used-up DATA packets. */
459                 remain = len - *_offset;
460                 copy = rx_pkt_len;
461                 if (copy > remain)
462                         copy = remain;
463                 if (copy > 0) {
464                         ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
465                                                       copy);
466                         if (ret2 < 0) {
467                                 ret = ret2;
468                                 goto out;
469                         }
470
471                         /* handle piecemeal consumption of data packets */
472                         rx_pkt_offset += copy;
473                         rx_pkt_len -= copy;
474                         *_offset += copy;
475                 }
476
477                 if (rx_pkt_len > 0) {
478                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
479                                             rx_pkt_offset, rx_pkt_len, 0);
480                         ASSERTCMP(*_offset, ==, len);
481                         ret = 0;
482                         break;
483                 }
484
485                 /* The whole packet has been transferred. */
486                 if (!(flags & MSG_PEEK))
487                         rxrpc_rotate_rx_window(call);
488                 rx_pkt_offset = 0;
489                 rx_pkt_len = 0;
490
491                 if (rx_pkt_last) {
492                         ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
493                         ret = 1;
494                         goto out;
495                 }
496
497                 seq++;
498         }
499
500 out:
501         if (!(flags & MSG_PEEK)) {
502                 call->rx_pkt_offset = rx_pkt_offset;
503                 call->rx_pkt_len = rx_pkt_len;
504                 call->rx_pkt_last = rx_pkt_last;
505         }
506 done:
507         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
508                             rx_pkt_offset, rx_pkt_len, ret);
509         if (ret == -EAGAIN)
510                 set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags);
511         return ret;
512 }
513
514 /*
515  * Receive a message from an RxRPC socket
516  * - we need to be careful about two or more threads calling recvmsg
517  *   simultaneously
518  */
519 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
520                   int flags)
521 {
522         struct rxrpc_call *call;
523         struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
524         struct list_head *l;
525         size_t copied = 0;
526         long timeo;
527         int ret;
528
529         DEFINE_WAIT(wait);
530
531         trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
532
533         if (flags & (MSG_OOB | MSG_TRUNC))
534                 return -EOPNOTSUPP;
535
536         timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
537
538 try_again:
539         lock_sock(&rx->sk);
540
541         /* Return immediately if a client socket has no outstanding calls */
542         if (RB_EMPTY_ROOT(&rx->calls) &&
543             list_empty(&rx->recvmsg_q) &&
544             rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
545                 release_sock(&rx->sk);
546                 return -ENODATA;
547         }
548
549         if (list_empty(&rx->recvmsg_q)) {
550                 ret = -EWOULDBLOCK;
551                 if (timeo == 0) {
552                         call = NULL;
553                         goto error_no_call;
554                 }
555
556                 release_sock(&rx->sk);
557
558                 /* Wait for something to happen */
559                 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
560                                           TASK_INTERRUPTIBLE);
561                 ret = sock_error(&rx->sk);
562                 if (ret)
563                         goto wait_error;
564
565                 if (list_empty(&rx->recvmsg_q)) {
566                         if (signal_pending(current))
567                                 goto wait_interrupted;
568                         trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
569                                             0, 0, 0, 0);
570                         timeo = schedule_timeout(timeo);
571                 }
572                 finish_wait(sk_sleep(&rx->sk), &wait);
573                 goto try_again;
574         }
575
576         /* Find the next call and dequeue it if we're not just peeking.  If we
577          * do dequeue it, that comes with a ref that we will need to release.
578          */
579         write_lock_bh(&rx->recvmsg_lock);
580         l = rx->recvmsg_q.next;
581         call = list_entry(l, struct rxrpc_call, recvmsg_link);
582         if (!(flags & MSG_PEEK))
583                 list_del_init(&call->recvmsg_link);
584         else
585                 rxrpc_get_call(call, rxrpc_call_got);
586         write_unlock_bh(&rx->recvmsg_lock);
587
588         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
589
590         /* We're going to drop the socket lock, so we need to lock the call
591          * against interference by sendmsg.
592          */
593         if (!mutex_trylock(&call->user_mutex)) {
594                 ret = -EWOULDBLOCK;
595                 if (flags & MSG_DONTWAIT)
596                         goto error_requeue_call;
597                 ret = -ERESTARTSYS;
598                 if (mutex_lock_interruptible(&call->user_mutex) < 0)
599                         goto error_requeue_call;
600         }
601
602         release_sock(&rx->sk);
603
604         if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
605                 BUG();
606
607         if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
608                 if (flags & MSG_CMSG_COMPAT) {
609                         unsigned int id32 = call->user_call_ID;
610
611                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
612                                        sizeof(unsigned int), &id32);
613                 } else {
614                         unsigned long idl = call->user_call_ID;
615
616                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
617                                        sizeof(unsigned long), &idl);
618                 }
619                 if (ret < 0)
620                         goto error_unlock_call;
621         }
622
623         if (msg->msg_name) {
624                 struct sockaddr_rxrpc *srx = msg->msg_name;
625                 size_t len = sizeof(call->peer->srx);
626
627                 memcpy(msg->msg_name, &call->peer->srx, len);
628                 srx->srx_service = call->service_id;
629                 msg->msg_namelen = len;
630         }
631
632         switch (READ_ONCE(call->state)) {
633         case RXRPC_CALL_SERVER_ACCEPTING:
634                 ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
635                 break;
636         case RXRPC_CALL_CLIENT_RECV_REPLY:
637         case RXRPC_CALL_SERVER_RECV_REQUEST:
638         case RXRPC_CALL_SERVER_ACK_REQUEST:
639                 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
640                                          flags, &copied);
641                 if (ret == -EAGAIN)
642                         ret = 0;
643
644                 if (after(call->rx_top, call->rx_hard_ack) &&
645                     call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
646                         rxrpc_notify_socket(call);
647                 break;
648         default:
649                 ret = 0;
650                 break;
651         }
652
653         if (ret < 0)
654                 goto error_unlock_call;
655
656         if (call->state == RXRPC_CALL_COMPLETE) {
657                 ret = rxrpc_recvmsg_term(call, msg);
658                 if (ret < 0)
659                         goto error_unlock_call;
660                 if (!(flags & MSG_PEEK))
661                         rxrpc_release_call(rx, call);
662                 msg->msg_flags |= MSG_EOR;
663                 ret = 1;
664         }
665
666         if (ret == 0)
667                 msg->msg_flags |= MSG_MORE;
668         else
669                 msg->msg_flags &= ~MSG_MORE;
670         ret = copied;
671
672 error_unlock_call:
673         mutex_unlock(&call->user_mutex);
674         rxrpc_put_call(call, rxrpc_call_put);
675         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
676         return ret;
677
678 error_requeue_call:
679         if (!(flags & MSG_PEEK)) {
680                 write_lock_bh(&rx->recvmsg_lock);
681                 list_add(&call->recvmsg_link, &rx->recvmsg_q);
682                 write_unlock_bh(&rx->recvmsg_lock);
683                 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
684         } else {
685                 rxrpc_put_call(call, rxrpc_call_put);
686         }
687 error_no_call:
688         release_sock(&rx->sk);
689 error_trace:
690         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
691         return ret;
692
693 wait_interrupted:
694         ret = sock_intr_errno(timeo);
695 wait_error:
696         finish_wait(sk_sleep(&rx->sk), &wait);
697         call = NULL;
698         goto error_trace;
699 }
700
701 /**
702  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
703  * @sock: The socket that the call exists on
704  * @call: The call to send data through
705  * @iter: The buffer to receive into
706  * @want_more: True if more data is expected to be read
707  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
708  * @_service: Where to store the actual service ID (may be upgraded)
709  *
710  * Allow a kernel service to receive data and pick up information about the
711  * state of a call.  Returns 0 if got what was asked for and there's more
712  * available, 1 if we got what was asked for and we're at the end of the data
713  * and -EAGAIN if we need more data.
714  *
715  * Note that we may return -EAGAIN to drain empty packets at the end of the
716  * data, even if we've already copied over the requested data.
717  *
718  * *_abort should also be initialised to 0.
719  */
720 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
721                            struct iov_iter *iter,
722                            bool want_more, u32 *_abort, u16 *_service)
723 {
724         size_t offset = 0;
725         int ret;
726
727         _enter("{%d,%s},%zu,%d",
728                call->debug_id, rxrpc_call_states[call->state],
729                iov_iter_count(iter), want_more);
730
731         ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
732
733         mutex_lock(&call->user_mutex);
734
735         switch (READ_ONCE(call->state)) {
736         case RXRPC_CALL_CLIENT_RECV_REPLY:
737         case RXRPC_CALL_SERVER_RECV_REQUEST:
738         case RXRPC_CALL_SERVER_ACK_REQUEST:
739                 ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
740                                          iov_iter_count(iter), 0,
741                                          &offset);
742                 if (ret < 0)
743                         goto out;
744
745                 /* We can only reach here with a partially full buffer if we
746                  * have reached the end of the data.  We must otherwise have a
747                  * full buffer or have been given -EAGAIN.
748                  */
749                 if (ret == 1) {
750                         if (iov_iter_count(iter) > 0)
751                                 goto short_data;
752                         if (!want_more)
753                                 goto read_phase_complete;
754                         ret = 0;
755                         goto out;
756                 }
757
758                 if (!want_more)
759                         goto excess_data;
760                 goto out;
761
762         case RXRPC_CALL_COMPLETE:
763                 goto call_complete;
764
765         default:
766                 ret = -EINPROGRESS;
767                 goto out;
768         }
769
770 read_phase_complete:
771         ret = 1;
772 out:
773         switch (call->ackr_reason) {
774         case RXRPC_ACK_IDLE:
775                 break;
776         case RXRPC_ACK_DELAY:
777                 if (ret != -EAGAIN)
778                         break;
779                 /* Fall through */
780         default:
781                 rxrpc_send_ack_packet(call, false, NULL);
782         }
783
784         if (_service)
785                 *_service = call->service_id;
786         mutex_unlock(&call->user_mutex);
787         _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
788         return ret;
789
790 short_data:
791         trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
792         ret = -EBADMSG;
793         goto out;
794 excess_data:
795         trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
796         ret = -EMSGSIZE;
797         goto out;
798 call_complete:
799         *_abort = call->abort_code;
800         ret = call->error;
801         if (call->completion == RXRPC_CALL_SUCCEEDED) {
802                 ret = 1;
803                 if (iov_iter_count(iter) > 0)
804                         ret = -ECONNRESET;
805         }
806         goto out;
807 }
808 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
809
810 /**
811  * rxrpc_kernel_get_reply_time - Get timestamp on first reply packet
812  * @sock: The socket that the call exists on
813  * @call: The call to query
814  * @_ts: Where to put the timestamp
815  *
816  * Retrieve the timestamp from the first DATA packet of the reply if it is
817  * in the ring.  Returns true if successful, false if not.
818  */
819 bool rxrpc_kernel_get_reply_time(struct socket *sock, struct rxrpc_call *call,
820                                  ktime_t *_ts)
821 {
822         struct sk_buff *skb;
823         rxrpc_seq_t hard_ack, top, seq;
824         bool success = false;
825
826         mutex_lock(&call->user_mutex);
827
828         if (READ_ONCE(call->state) != RXRPC_CALL_CLIENT_RECV_REPLY)
829                 goto out;
830
831         hard_ack = call->rx_hard_ack;
832         if (hard_ack != 0)
833                 goto out;
834
835         seq = hard_ack + 1;
836         top = smp_load_acquire(&call->rx_top);
837         if (after(seq, top))
838                 goto out;
839
840         skb = call->rxtx_buffer[seq & RXRPC_RXTX_BUFF_MASK];
841         if (!skb)
842                 goto out;
843
844         *_ts = skb_get_ktime(skb);
845         success = true;
846
847 out:
848         mutex_unlock(&call->user_mutex);
849         return success;
850 }
851 EXPORT_SYMBOL(rxrpc_kernel_get_reply_time);