SUNRPC: Fix socket waits for write buffer space
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Tue, 15 Mar 2022 01:02:10 +0000 (21:02 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Tue, 22 Mar 2022 19:52:55 +0000 (15:52 -0400)
The socket layer requires that we use the socket lock to protect changes
to the sock->sk_write_pending field and others.

Reported-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
net/sunrpc/xprtsock.c

index d2bf3b4..68eee35 100644 (file)
@@ -764,12 +764,12 @@ xs_stream_start_connect(struct sock_xprt *transport)
 /**
  * xs_nospace - handle transmit was incomplete
  * @req: pointer to RPC request
+ * @transport: pointer to struct sock_xprt
  *
  */
-static int xs_nospace(struct rpc_rqst *req)
+static int xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport)
 {
-       struct rpc_xprt *xprt = req->rq_xprt;
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+       struct rpc_xprt *xprt = &transport->xprt;
        struct sock *sk = transport->inet;
        int ret = -EAGAIN;
 
@@ -780,25 +780,49 @@ static int xs_nospace(struct rpc_rqst *req)
 
        /* Don't race with disconnect */
        if (xprt_connected(xprt)) {
+               struct socket_wq *wq;
+
+               rcu_read_lock();
+               wq = rcu_dereference(sk->sk_wq);
+               set_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags);
+               rcu_read_unlock();
+
                /* wait for more buffer space */
+               set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
                sk->sk_write_pending++;
                xprt_wait_for_buffer_space(xprt);
        } else
                ret = -ENOTCONN;
 
        spin_unlock(&xprt->transport_lock);
+       return ret;
+}
 
-       /* Race breaker in case memory is freed before above code is called */
-       if (ret == -EAGAIN) {
-               struct socket_wq *wq;
+static int xs_sock_nospace(struct rpc_rqst *req)
+{
+       struct sock_xprt *transport =
+               container_of(req->rq_xprt, struct sock_xprt, xprt);
+       struct sock *sk = transport->inet;
+       int ret = -EAGAIN;
 
-               rcu_read_lock();
-               wq = rcu_dereference(sk->sk_wq);
-               set_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags);
-               rcu_read_unlock();
+       lock_sock(sk);
+       if (!sock_writeable(sk))
+               ret = xs_nospace(req, transport);
+       release_sock(sk);
+       return ret;
+}
 
-               sk->sk_write_space(sk);
-       }
+static int xs_stream_nospace(struct rpc_rqst *req)
+{
+       struct sock_xprt *transport =
+               container_of(req->rq_xprt, struct sock_xprt, xprt);
+       struct sock *sk = transport->inet;
+       int ret = -EAGAIN;
+
+       lock_sock(sk);
+       if (!sk_stream_memory_free(sk))
+               ret = xs_nospace(req, transport);
+       release_sock(sk);
        return ret;
 }
 
@@ -888,7 +912,7 @@ static int xs_local_send_request(struct rpc_rqst *req)
        case -ENOBUFS:
                break;
        case -EAGAIN:
-               status = xs_nospace(req);
+               status = xs_stream_nospace(req);
                break;
        default:
                dprintk("RPC:       sendmsg returned unrecognized error %d\n",
@@ -964,7 +988,7 @@ process_status:
                /* Should we call xs_close() here? */
                break;
        case -EAGAIN:
-               status = xs_nospace(req);
+               status = xs_sock_nospace(req);
                break;
        case -ENETUNREACH:
        case -ENOBUFS:
@@ -1086,7 +1110,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
                /* Should we call xs_close() here? */
                break;
        case -EAGAIN:
-               status = xs_nospace(req);
+               status = xs_stream_nospace(req);
                break;
        case -ECONNRESET:
        case -ECONNREFUSED: