{
if (qdisc->flags & TCQ_F_NOLOCK) {
if (spin_trylock(&qdisc->seqlock))
- goto nolock_empty;
+ return true;
+ /* Paired with smp_mb__after_atomic() to make sure
+ * STATE_MISSED checking is synchronized with clearing
+ * in pfifo_fast_dequeue().
+ */
+ smp_mb__before_atomic();
+
/* If the MISSED flag is set, it means other thread has
* set the MISSED flag before second spin_trylock(), so
* we can return false here to avoid multi cpus doing
return !empty;
}
-static int tcp_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags,
- long timeo, int *err)
++static int tcp_msg_wait_data(struct sock *sk, struct sk_psock *psock,
++ long timeo)
+ {
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+ int ret = 0;
+
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ return 1;
+
+ if (!timeo)
+ return ret;
+
+ add_wait_queue(sk_sleep(sk), &wait);
+ sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ ret = sk_wait_event(sk, &timeo,
+ !list_empty(&psock->ingress_msg) ||
+ !skb_queue_empty(&sk->sk_receive_queue), &wait);
+ sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ remove_wait_queue(sk_sleep(sk), &wait);
+ return ret;
+ }
+
static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len)
{
msg_bytes_ready:
copied = sk_msg_recvmsg(sk, psock, msg, len, flags);
if (!copied) {
- int data, err = 0;
long timeo;
+ int data;
timeo = sock_rcvtimeo(sk, nonblock);
- data = sk_msg_wait_data(sk, psock, timeo);
- data = tcp_msg_wait_data(sk, psock, flags, timeo, &err);
++ data = tcp_msg_wait_data(sk, psock, timeo);
if (data) {
if (!sk_psock_queue_empty(psock))
goto msg_bytes_ready;
return udp_prot.recvmsg(sk, msg, len, noblock, flags, addr_len);
}
-static int udp_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags,
- long timeo, int *err)
+ static bool udp_sk_has_data(struct sock *sk)
+ {
+ return !skb_queue_empty(&udp_sk(sk)->reader_queue) ||
+ !skb_queue_empty(&sk->sk_receive_queue);
+ }
+
+ static bool psock_has_data(struct sk_psock *psock)
+ {
+ return !skb_queue_empty(&psock->ingress_skb) ||
+ !sk_psock_queue_empty(psock);
+ }
+
+ #define udp_msg_has_data(__sk, __psock) \
+ ({ udp_sk_has_data(__sk) || psock_has_data(__psock); })
+
++static int udp_msg_wait_data(struct sock *sk, struct sk_psock *psock,
++ long timeo)
+ {
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+ int ret = 0;
+
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ return 1;
+
+ if (!timeo)
+ return ret;
+
+ add_wait_queue(sk_sleep(sk), &wait);
+ sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ ret = udp_msg_has_data(sk, psock);
+ if (!ret) {
+ wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
+ ret = udp_msg_has_data(sk, psock);
+ }
+ sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ remove_wait_queue(sk_sleep(sk), &wait);
+ return ret;
+ }
+
static int udp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len)
{
msg_bytes_ready:
copied = sk_msg_recvmsg(sk, psock, msg, len, flags);
if (!copied) {
- int data, err = 0;
long timeo;
+ int data;
timeo = sock_rcvtimeo(sk, nonblock);
- data = sk_msg_wait_data(sk, psock, timeo);
- data = udp_msg_wait_data(sk, psock, flags, timeo, &err);
++ data = udp_msg_wait_data(sk, psock, timeo);
if (data) {
- if (!sk_psock_queue_empty(psock))
+ if (psock_has_data(psock))
goto msg_bytes_ready;
ret = sk_udp_recvmsg(sk, msg, len, nonblock, flags, addr_len);
goto out;