Documentation: ACPI: move apei/einj.txt to firmware-guide/acpi and convert to reST
[linux-2.6-microblaze.git] / net / sunrpc / xprtsock.c
index 7754aa3..9359539 100644 (file)
@@ -50,6 +50,7 @@
 #include <linux/bvec.h>
 #include <linux/highmem.h>
 #include <linux/uio.h>
+#include <linux/sched/mm.h>
 
 #include <trace/events/sunrpc.h>
 
@@ -404,8 +405,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
        size_t want, seek_init = seek, offset = 0;
        ssize_t ret;
 
-       if (seek < buf->head[0].iov_len) {
-               want = min_t(size_t, count, buf->head[0].iov_len);
+       want = min_t(size_t, count, buf->head[0].iov_len);
+       if (seek < want) {
                ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
                if (ret <= 0)
                        goto sock_err;
@@ -416,13 +417,13 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
                        goto out;
                seek = 0;
        } else {
-               seek -= buf->head[0].iov_len;
-               offset += buf->head[0].iov_len;
+               seek -= want;
+               offset += want;
        }
 
        want = xs_alloc_sparse_pages(buf,
                        min_t(size_t, count - offset, buf->page_len),
-                       GFP_NOWAIT);
+                       GFP_KERNEL);
        if (seek < want) {
                ret = xs_read_bvec(sock, msg, flags, buf->bvec,
                                xdr_buf_pagecount(buf),
@@ -442,8 +443,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
                offset += want;
        }
 
-       if (seek < buf->tail[0].iov_len) {
-               want = min_t(size_t, count - offset, buf->tail[0].iov_len);
+       want = min_t(size_t, count - offset, buf->tail[0].iov_len);
+       if (seek < want) {
                ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
                if (ret <= 0)
                        goto sock_err;
@@ -452,8 +453,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
                        goto out;
                if (ret != want)
                        goto out;
-       } else
-               offset += buf->tail[0].iov_len;
+       } else if (offset < seek_init)
+               offset = seek_init;
        ret = -EMSGSIZE;
 out:
        *read = offset - seek_init;
@@ -481,6 +482,14 @@ xs_read_stream_request_done(struct sock_xprt *transport)
        return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
 }
 
+static void
+xs_read_stream_check_eor(struct sock_xprt *transport,
+               struct msghdr *msg)
+{
+       if (xs_read_stream_request_done(transport))
+               msg->msg_flags |= MSG_EOR;
+}
+
 static ssize_t
 xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
                int flags, struct rpc_rqst *req)
@@ -492,17 +501,21 @@ xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
        xs_read_header(transport, buf);
 
        want = transport->recv.len - transport->recv.offset;
-       ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
-                       transport->recv.copied + want, transport->recv.copied,
-                       &read);
-       transport->recv.offset += read;
-       transport->recv.copied += read;
-       if (transport->recv.offset == transport->recv.len) {
-               if (xs_read_stream_request_done(transport))
-                       msg->msg_flags |= MSG_EOR;
-               return read;
+       if (want != 0) {
+               ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
+                               transport->recv.copied + want,
+                               transport->recv.copied,
+                               &read);
+               transport->recv.offset += read;
+               transport->recv.copied += read;
        }
 
+       if (transport->recv.offset == transport->recv.len)
+               xs_read_stream_check_eor(transport, msg);
+
+       if (want == 0)
+               return 0;
+
        switch (ret) {
        default:
                break;
@@ -655,13 +668,35 @@ out_err:
        return ret != 0 ? ret : -ESHUTDOWN;
 }
 
+static __poll_t xs_poll_socket(struct sock_xprt *transport)
+{
+       return transport->sock->ops->poll(transport->file, transport->sock,
+                       NULL);
+}
+
+static bool xs_poll_socket_readable(struct sock_xprt *transport)
+{
+       __poll_t events = xs_poll_socket(transport);
+
+       return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP);
+}
+
+static void xs_poll_check_readable(struct sock_xprt *transport)
+{
+
+       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+       if (!xs_poll_socket_readable(transport))
+               return;
+       if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+               queue_work(xprtiod_workqueue, &transport->recv_worker);
+}
+
 static void xs_stream_data_receive(struct sock_xprt *transport)
 {
        size_t read = 0;
        ssize_t ret = 0;
 
        mutex_lock(&transport->recv_mutex);
-       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
        if (transport->sock == NULL)
                goto out;
        for (;;) {
@@ -671,6 +706,10 @@ static void xs_stream_data_receive(struct sock_xprt *transport)
                read += ret;
                cond_resched();
        }
+       if (ret == -ESHUTDOWN)
+               kernel_sock_shutdown(transport->sock, SHUT_RDWR);
+       else
+               xs_poll_check_readable(transport);
 out:
        mutex_unlock(&transport->recv_mutex);
        trace_xs_stream_read_data(&transport->xprt, ret, read);
@@ -680,7 +719,10 @@ static void xs_stream_data_receive_workfn(struct work_struct *work)
 {
        struct sock_xprt *transport =
                container_of(work, struct sock_xprt, recv_worker);
+       unsigned int pflags = memalloc_nofs_save();
+
        xs_stream_data_receive(transport);
+       memalloc_nofs_restore(pflags);
 }
 
 static void
@@ -690,65 +732,65 @@ xs_stream_reset_connect(struct sock_xprt *transport)
        transport->recv.len = 0;
        transport->recv.copied = 0;
        transport->xmit.offset = 0;
+}
+
+static void
+xs_stream_start_connect(struct sock_xprt *transport)
+{
        transport->xprt.stat.connect_count++;
        transport->xprt.stat.connect_start = jiffies;
 }
 
 #define XS_SENDMSG_FLAGS       (MSG_DONTWAIT | MSG_NOSIGNAL)
 
-static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
+static int xs_sendmsg(struct socket *sock, struct msghdr *msg, size_t seek)
 {
-       struct msghdr msg = {
-               .msg_name       = addr,
-               .msg_namelen    = addrlen,
-               .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
-       };
-       struct kvec iov = {
-               .iov_base       = vec->iov_base + base,
-               .iov_len        = vec->iov_len - base,
-       };
+       if (seek)
+               iov_iter_advance(&msg->msg_iter, seek);
+       return sock_sendmsg(sock, msg);
+}
 
-       if (iov.iov_len != 0)
-               return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
-       return kernel_sendmsg(sock, &msg, NULL, 0, 0);
+static int xs_send_kvec(struct socket *sock, struct msghdr *msg, struct kvec *vec, size_t seek)
+{
+       iov_iter_kvec(&msg->msg_iter, WRITE, vec, 1, vec->iov_len);
+       return xs_sendmsg(sock, msg, seek);
 }
 
-static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p)
+static int xs_send_pagedata(struct socket *sock, struct msghdr *msg, struct xdr_buf *xdr, size_t base)
 {
-       ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
-                       int offset, size_t size, int flags);
-       struct page **ppage;
-       unsigned int remainder;
        int err;
 
-       remainder = xdr->page_len - base;
-       base += xdr->page_base;
-       ppage = xdr->pages + (base >> PAGE_SHIFT);
-       base &= ~PAGE_MASK;
-       do_sendpage = sock->ops->sendpage;
-       if (!zerocopy)
-               do_sendpage = sock_no_sendpage;
-       for(;;) {
-               unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
-               int flags = XS_SENDMSG_FLAGS;
+       err = xdr_alloc_bvec(xdr, GFP_KERNEL);
+       if (err < 0)
+               return err;
 
-               remainder -= len;
-               if (more)
-                       flags |= MSG_MORE;
-               if (remainder != 0)
-                       flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
-               err = do_sendpage(sock, *ppage, base, len, flags);
-               if (remainder == 0 || err != len)
-                       break;
-               *sent_p += err;
-               ppage++;
-               base = 0;
-       }
-       if (err > 0) {
-               *sent_p += err;
-               err = 0;
-       }
-       return err;
+       iov_iter_bvec(&msg->msg_iter, WRITE, xdr->bvec,
+                       xdr_buf_pagecount(xdr),
+                       xdr->page_len + xdr->page_base);
+       return xs_sendmsg(sock, msg, base + xdr->page_base);
+}
+
+#define xs_record_marker_len() sizeof(rpc_fraghdr)
+
+/* Common case:
+ *  - stream transport
+ *  - sending from byte 0 of the message
+ *  - the message is wholly contained in @xdr's head iovec
+ */
+static int xs_send_rm_and_kvec(struct socket *sock, struct msghdr *msg,
+               rpc_fraghdr marker, struct kvec *vec, size_t base)
+{
+       struct kvec iov[2] = {
+               [0] = {
+                       .iov_base       = &marker,
+                       .iov_len        = sizeof(marker)
+               },
+               [1] = *vec,
+       };
+       size_t len = iov[0].iov_len + iov[1].iov_len;
+
+       iov_iter_kvec(&msg->msg_iter, WRITE, iov, 2, len);
+       return xs_sendmsg(sock, msg, base);
 }
 
 /**
@@ -758,49 +800,60 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
  * @addrlen: UDP only -- length of destination address
  * @xdr: buffer containing this request
  * @base: starting position in the buffer
- * @zerocopy: true if it is safe to use sendpage()
+ * @rm: stream record marker field
  * @sent_p: return the total number of bytes successfully queued for sending
  *
  */
-static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p)
+static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, rpc_fraghdr rm, int *sent_p)
 {
-       unsigned int remainder = xdr->len - base;
+       struct msghdr msg = {
+               .msg_name = addr,
+               .msg_namelen = addrlen,
+               .msg_flags = XS_SENDMSG_FLAGS | MSG_MORE,
+       };
+       unsigned int rmsize = rm ? sizeof(rm) : 0;
+       unsigned int remainder = rmsize + xdr->len - base;
+       unsigned int want;
        int err = 0;
-       int sent = 0;
 
        if (unlikely(!sock))
                return -ENOTSOCK;
 
-       if (base != 0) {
-               addr = NULL;
-               addrlen = 0;
-       }
-
-       if (base < xdr->head[0].iov_len || addr != NULL) {
-               unsigned int len = xdr->head[0].iov_len - base;
+       want = xdr->head[0].iov_len + rmsize;
+       if (base < want) {
+               unsigned int len = want - base;
                remainder -= len;
-               err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
+               if (remainder == 0)
+                       msg.msg_flags &= ~MSG_MORE;
+               if (rmsize)
+                       err = xs_send_rm_and_kvec(sock, &msg, rm,
+                                       &xdr->head[0], base);
+               else
+                       err = xs_send_kvec(sock, &msg, &xdr->head[0], base);
                if (remainder == 0 || err != len)
                        goto out;
                *sent_p += err;
                base = 0;
        } else
-               base -= xdr->head[0].iov_len;
+               base -= want;
 
        if (base < xdr->page_len) {
                unsigned int len = xdr->page_len - base;
                remainder -= len;
-               err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent);
-               *sent_p += sent;
-               if (remainder == 0 || sent != len)
+               if (remainder == 0)
+                       msg.msg_flags &= ~MSG_MORE;
+               err = xs_send_pagedata(sock, &msg, xdr, base);
+               if (remainder == 0 || err != len)
                        goto out;
+               *sent_p += err;
                base = 0;
        } else
                base -= xdr->page_len;
 
        if (base >= xdr->tail[0].iov_len)
                return 0;
-       err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
+       msg.msg_flags &= ~MSG_MORE;
+       err = xs_send_kvec(sock, &msg, &xdr->tail[0], base);
 out:
        if (err > 0) {
                *sent_p += err;
@@ -856,7 +909,7 @@ static int xs_nospace(struct rpc_rqst *req)
 static void
 xs_stream_prepare_request(struct rpc_rqst *req)
 {
-       req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO);
+       req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_KERNEL);
 }
 
 /*
@@ -870,13 +923,14 @@ xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
 }
 
 /*
- * Construct a stream transport record marker in @buf.
+ * Return the stream record marker field for a record of length < 2^31-1
  */
-static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
+static rpc_fraghdr
+xs_stream_record_marker(struct xdr_buf *xdr)
 {
-       u32 reclen = buf->len - sizeof(rpc_fraghdr);
-       rpc_fraghdr *base = buf->head[0].iov_base;
-       *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
+       if (!xdr->len)
+               return 0;
+       return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len);
 }
 
 /**
@@ -905,15 +959,14 @@ static int xs_local_send_request(struct rpc_rqst *req)
                return -ENOTCONN;
        }
 
-       xs_encode_stream_record_marker(&req->rq_snd_buf);
-
        xs_pktdump("packet data:",
                        req->rq_svec->iov_base, req->rq_svec->iov_len);
 
        req->rq_xtime = ktime_get();
        status = xs_sendpages(transport->sock, NULL, 0, xdr,
                              transport->xmit.offset,
-                             true, &sent);
+                             xs_stream_record_marker(xdr),
+                             &sent);
        dprintk("RPC:       %s(%u) = %d\n",
                        __func__, xdr->len - transport->xmit.offset, status);
 
@@ -925,7 +978,6 @@ static int xs_local_send_request(struct rpc_rqst *req)
                req->rq_bytes_sent = transport->xmit.offset;
                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
                        req->rq_xmit_bytes_sent += transport->xmit.offset;
-                       req->rq_bytes_sent = 0;
                        transport->xmit.offset = 0;
                        return 0;
                }
@@ -981,7 +1033,7 @@ static int xs_udp_send_request(struct rpc_rqst *req)
 
        req->rq_xtime = ktime_get();
        status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
-                             xdr, 0, true, &sent);
+                             xdr, 0, 0, &sent);
 
        dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
                        xdr->len, status);
@@ -1045,7 +1097,6 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
        struct rpc_xprt *xprt = req->rq_xprt;
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
        struct xdr_buf *xdr = &req->rq_snd_buf;
-       bool zerocopy = true;
        bool vm_wait = false;
        int status;
        int sent;
@@ -1057,17 +1108,9 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
                return -ENOTCONN;
        }
 
-       xs_encode_stream_record_marker(&req->rq_snd_buf);
-
        xs_pktdump("packet data:",
                                req->rq_svec->iov_base,
                                req->rq_svec->iov_len);
-       /* Don't use zero copy if this is a resend. If the RPC call
-        * completes while the socket holds a reference to the pages,
-        * then we may end up resending corrupted data.
-        */
-       if (req->rq_task->tk_flags & RPC_TASK_SENT)
-               zerocopy = false;
 
        if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
                xs_tcp_set_socket_timeouts(xprt, transport->sock);
@@ -1080,7 +1123,8 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
                sent = 0;
                status = xs_sendpages(transport->sock, NULL, 0, xdr,
                                      transport->xmit.offset,
-                                     zerocopy, &sent);
+                                     xs_stream_record_marker(xdr),
+                                     &sent);
 
                dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
                                xdr->len - transport->xmit.offset, status);
@@ -1091,7 +1135,6 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
                req->rq_bytes_sent = transport->xmit.offset;
                if (likely(req->rq_bytes_sent >= req->rq_slen)) {
                        req->rq_xmit_bytes_sent += transport->xmit.offset;
-                       req->rq_bytes_sent = 0;
                        transport->xmit.offset = 0;
                        return 0;
                }
@@ -1211,6 +1254,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
        struct socket *sock = transport->sock;
        struct sock *sk = transport->inet;
        struct rpc_xprt *xprt = &transport->xprt;
+       struct file *filp = transport->file;
 
        if (sk == NULL)
                return;
@@ -1224,6 +1268,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
        write_lock_bh(&sk->sk_callback_lock);
        transport->inet = NULL;
        transport->sock = NULL;
+       transport->file = NULL;
 
        sk->sk_user_data = NULL;
 
@@ -1231,10 +1276,12 @@ static void xs_reset_transport(struct sock_xprt *transport)
        xprt_clear_connected(xprt);
        write_unlock_bh(&sk->sk_callback_lock);
        xs_sock_reset_connection_flags(xprt);
+       /* Reset stream record info */
+       xs_stream_reset_connect(transport);
        mutex_unlock(&transport->recv_mutex);
 
        trace_rpc_socket_close(xprt, sock);
-       sock_release(sock);
+       fput(filp);
 
        xprt_disconnect_done(xprt);
 }
@@ -1358,7 +1405,6 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
        int err;
 
        mutex_lock(&transport->recv_mutex);
-       clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
        sk = transport->inet;
        if (sk == NULL)
                goto out;
@@ -1370,6 +1416,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
                consume_skb(skb);
                cond_resched();
        }
+       xs_poll_check_readable(transport);
 out:
        mutex_unlock(&transport->recv_mutex);
 }
@@ -1378,7 +1425,10 @@ static void xs_udp_data_receive_workfn(struct work_struct *work)
 {
        struct sock_xprt *transport =
                container_of(work, struct sock_xprt, recv_worker);
+       unsigned int pflags = memalloc_nofs_save();
+
        xs_udp_data_receive(transport);
+       memalloc_nofs_restore(pflags);
 }
 
 /**
@@ -1826,6 +1876,7 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt,
                struct sock_xprt *transport, int family, int type,
                int protocol, bool reuseport)
 {
+       struct file *filp;
        struct socket *sock;
        int err;
 
@@ -1846,6 +1897,11 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt,
                goto out;
        }
 
+       filp = sock_alloc_file(sock, O_NONBLOCK, NULL);
+       if (IS_ERR(filp))
+               return ERR_CAST(filp);
+       transport->file = filp;
+
        return sock;
 out:
        return ERR_PTR(err);
@@ -1869,7 +1925,6 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
                sk->sk_write_space = xs_udp_write_space;
                sock_set_flag(sk, SOCK_FASYNC);
                sk->sk_error_report = xs_error_report;
-               sk->sk_allocation = GFP_NOIO;
 
                xprt_clear_connected(xprt);
 
@@ -1880,7 +1935,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
                write_unlock_bh(&sk->sk_callback_lock);
        }
 
-       xs_stream_reset_connect(transport);
+       xs_stream_start_connect(transport);
 
        return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
 }
@@ -1892,6 +1947,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
 static int xs_local_setup_socket(struct sock_xprt *transport)
 {
        struct rpc_xprt *xprt = &transport->xprt;
+       struct file *filp;
        struct socket *sock;
        int status = -EIO;
 
@@ -1904,6 +1960,13 @@ static int xs_local_setup_socket(struct sock_xprt *transport)
        }
        xs_reclassify_socket(AF_LOCAL, sock);
 
+       filp = sock_alloc_file(sock, O_NONBLOCK, NULL);
+       if (IS_ERR(filp)) {
+               status = PTR_ERR(filp);
+               goto out;
+       }
+       transport->file = filp;
+
        dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
                        xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
 
@@ -2057,7 +2120,6 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
                sk->sk_data_ready = xs_data_ready;
                sk->sk_write_space = xs_udp_write_space;
                sock_set_flag(sk, SOCK_FASYNC);
-               sk->sk_allocation = GFP_NOIO;
 
                xprt_set_connected(xprt);
 
@@ -2220,7 +2282,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
                sk->sk_write_space = xs_tcp_write_space;
                sock_set_flag(sk, SOCK_FASYNC);
                sk->sk_error_report = xs_error_report;
-               sk->sk_allocation = GFP_NOIO;
 
                /* socket options */
                sock_reset_flag(sk, SOCK_LINGER);
@@ -2240,8 +2301,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
        xs_set_memalloc(xprt);
 
-       /* Reset TCP record info */
-       xs_stream_reset_connect(transport);
+       xs_stream_start_connect(transport);
 
        /* Tell the socket layer to start connecting... */
        set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
@@ -2534,26 +2594,35 @@ static int bc_sendto(struct rpc_rqst *req)
 {
        int len;
        struct xdr_buf *xbufp = &req->rq_snd_buf;
-       struct rpc_xprt *xprt = req->rq_xprt;
        struct sock_xprt *transport =
-                               container_of(xprt, struct sock_xprt, xprt);
-       struct socket *sock = transport->sock;
+                       container_of(req->rq_xprt, struct sock_xprt, xprt);
        unsigned long headoff;
        unsigned long tailoff;
+       struct page *tailpage;
+       struct msghdr msg = {
+               .msg_flags      = MSG_MORE
+       };
+       rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT |
+                                        (u32)xbufp->len);
+       struct kvec iov = {
+               .iov_base       = &marker,
+               .iov_len        = sizeof(marker),
+       };
 
-       xs_encode_stream_record_marker(xbufp);
+       len = kernel_sendmsg(transport->sock, &msg, &iov, 1, iov.iov_len);
+       if (len != iov.iov_len)
+               return -EAGAIN;
 
+       tailpage = NULL;
+       if (xbufp->tail[0].iov_len)
+               tailpage = virt_to_page(xbufp->tail[0].iov_base);
        tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
        headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
-       len = svc_send_common(sock, xbufp,
+       len = svc_send_common(transport->sock, xbufp,
                              virt_to_page(xbufp->head[0].iov_base), headoff,
-                             xbufp->tail[0].iov_base, tailoff);
-
-       if (len != xbufp->len) {
-               printk(KERN_NOTICE "Error sending entire callback!\n");
-               len = -EAGAIN;
-       }
-
+                             tailpage, tailoff);
+       if (len != xbufp->len)
+               return -EAGAIN;
        return len;
 }
 
@@ -2793,7 +2862,6 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
        transport = container_of(xprt, struct sock_xprt, xprt);
 
        xprt->prot = 0;
-       xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
 
        xprt->bind_timeout = XS_BIND_TO;
@@ -2862,7 +2930,6 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
        transport = container_of(xprt, struct sock_xprt, xprt);
 
        xprt->prot = IPPROTO_UDP;
-       xprt->tsh_size = 0;
        /* XXX: header size can vary due to auth type, IPv6, etc. */
        xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
 
@@ -2942,7 +3009,6 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
        transport = container_of(xprt, struct sock_xprt, xprt);
 
        xprt->prot = IPPROTO_TCP;
-       xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
 
        xprt->bind_timeout = XS_BIND_TO;
@@ -3015,7 +3081,6 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
        transport = container_of(xprt, struct sock_xprt, xprt);
 
        xprt->prot = IPPROTO_TCP;
-       xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
        xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
        xprt->timeout = &xs_tcp_default_timeout;