svcrdma: Use parsed chunk lists to encode Reply transport headers
authorChuck Lever <chuck.lever@oracle.com>
Wed, 17 Jun 2020 15:50:34 +0000 (11:50 -0400)
committerChuck Lever <chuck.lever@oracle.com>
Mon, 30 Nov 2020 18:00:22 +0000 (13:00 -0500)
Refactor: Instead of re-parsing the ingress RPC Call transport
header when constructing the egress RPC Reply transport header, use
the new parsed Write list and Reply chunk, which are version-
agnostic and already XDR decoded.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
include/trace/events/rpcrdma.h
net/sunrpc/xprtrdma/svc_rdma_sendto.c

index 72b941a..5218e0f 100644 (file)
@@ -1447,9 +1447,44 @@ DECLARE_EVENT_CLASS(svcrdma_segment_event,
                                TP_ARGS(handle, length, offset))
 
 DEFINE_SEGMENT_EVENT(send_rseg);
-DEFINE_SEGMENT_EVENT(encode_wseg);
 DEFINE_SEGMENT_EVENT(send_wseg);
 
+TRACE_EVENT(svcrdma_encode_wseg,
+       TP_PROTO(
+               const struct svc_rdma_send_ctxt *ctxt,
+               u32 segno,
+               u32 handle,
+               u32 length,
+               u64 offset
+       ),
+
+       TP_ARGS(ctxt, segno, handle, length, offset),
+
+       TP_STRUCT__entry(
+               __field(u32, cq_id)
+               __field(int, completion_id)
+               __field(u32, segno)
+               __field(u32, handle)
+               __field(u32, length)
+               __field(u64, offset)
+       ),
+
+       TP_fast_assign(
+               __entry->cq_id = ctxt->sc_cid.ci_queue_id;
+               __entry->completion_id = ctxt->sc_cid.ci_completion_id;
+               __entry->segno = segno;
+               __entry->handle = handle;
+               __entry->length = length;
+               __entry->offset = offset;
+       ),
+
+       TP_printk("cq_id=%u cid=%d segno=%u %u@0x%016llx:0x%08x",
+               __entry->cq_id, __entry->completion_id,
+               __entry->segno, __entry->length,
+               (unsigned long long)__entry->offset, __entry->handle
+       )
+);
+
 TRACE_EVENT(svcrdma_decode_rseg,
        TP_PROTO(
                const struct rpc_rdma_cid *cid,
index 9a6d3de..1fdbbad 100644 (file)
@@ -358,49 +358,42 @@ static ssize_t svc_rdma_encode_read_list(struct svc_rdma_send_ctxt *sctxt)
 
 /**
  * svc_rdma_encode_write_segment - Encode one Write segment
- * @src: matching Write chunk in the RPC Call header
  * @sctxt: Send context for the RPC Reply
+ * @chunk: Write chunk to push
  * @remaining: remaining bytes of the payload left in the Write chunk
+ * @segno: which segment in the chunk
  *
  * Return values:
  *   On success, returns length in bytes of the Reply XDR buffer
- *   that was consumed by the Write segment
+ *   that was consumed by the Write segment, and updates @remaining
  *   %-EMSGSIZE on XDR buffer overflow
  */
-static ssize_t svc_rdma_encode_write_segment(__be32 *src,
-                                            struct svc_rdma_send_ctxt *sctxt,
-                                            unsigned int *remaining)
+static ssize_t svc_rdma_encode_write_segment(struct svc_rdma_send_ctxt *sctxt,
+                                            const struct svc_rdma_chunk *chunk,
+                                            u32 *remaining, unsigned int segno)
 {
+       const struct svc_rdma_segment *segment = &chunk->ch_segments[segno];
+       const size_t len = rpcrdma_segment_maxsz * sizeof(__be32);
+       u32 length;
        __be32 *p;
-       const size_t len = rpcrdma_segment_maxsz * sizeof(*p);
-       u32 handle, length;
-       u64 offset;
 
        p = xdr_reserve_space(&sctxt->sc_stream, len);
        if (!p)
                return -EMSGSIZE;
 
-       xdr_decode_rdma_segment(src, &handle, &length, &offset);
-
-       if (*remaining < length) {
-               /* segment only partly filled */
-               length = *remaining;
-               *remaining = 0;
-       } else {
-               /* entire segment was consumed */
-               *remaining -= length;
-       }
-       xdr_encode_rdma_segment(p, handle, length, offset);
-
-       trace_svcrdma_encode_wseg(handle, length, offset);
+       length = min_t(u32, *remaining, segment->rs_length);
+       *remaining -= length;
+       xdr_encode_rdma_segment(p, segment->rs_handle, length,
+                               segment->rs_offset);
+       trace_svcrdma_encode_wseg(sctxt, segno, segment->rs_handle, length,
+                                 segment->rs_offset);
        return len;
 }
 
 /**
  * svc_rdma_encode_write_chunk - Encode one Write chunk
- * @src: matching Write chunk in the RPC Call header
  * @sctxt: Send context for the RPC Reply
- * @remaining: size in bytes of the payload in the Write chunk
+ * @chunk: Write chunk to push
  *
  * Copy a Write chunk from the Call transport header to the
  * Reply transport header. Update each segment's length field
@@ -411,33 +404,30 @@ static ssize_t svc_rdma_encode_write_segment(__be32 *src,
  *   that was consumed by the Write chunk
  *   %-EMSGSIZE on XDR buffer overflow
  */
-static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
-                                          struct svc_rdma_send_ctxt *sctxt,
-                                          unsigned int remaining)
+static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
+                                          const struct svc_rdma_chunk *chunk)
 {
-       unsigned int i, nsegs;
+       u32 remaining = chunk->ch_payload_length;
+       unsigned int segno;
        ssize_t len, ret;
 
-       len = 0;
        trace_svcrdma_encode_write_chunk(remaining);
 
-       src++;
+       len = 0;
        ret = xdr_stream_encode_item_present(&sctxt->sc_stream);
        if (ret < 0)
-               return -EMSGSIZE;
+               return ret;
        len += ret;
 
-       nsegs = be32_to_cpup(src++);
-       ret = xdr_stream_encode_u32(&sctxt->sc_stream, nsegs);
+       ret = xdr_stream_encode_u32(&sctxt->sc_stream, chunk->ch_segcount);
        if (ret < 0)
-               return -EMSGSIZE;
+               return ret;
        len += ret;
 
-       for (i = nsegs; i; i--) {
-               ret = svc_rdma_encode_write_segment(src, sctxt, &remaining);
+       for (segno = 0; segno < chunk->ch_segcount; segno++) {
+               ret = svc_rdma_encode_write_segment(sctxt, chunk, &remaining, segno);
                if (ret < 0)
-                       return -EMSGSIZE;
-               src += rpcrdma_segment_maxsz;
+                       return ret;
                len += ret;
        }
 
@@ -449,34 +439,23 @@ static ssize_t svc_rdma_encode_write_chunk(__be32 *src,
  * @rctxt: Reply context with information about the RPC Call
  * @sctxt: Send context for the RPC Reply
  *
- * The client provides a Write chunk list in the Call message. Fill
- * in the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
- *
- * Assumptions:
- *  - Client has provided only one Write chunk
- *
  * Return values:
  *   On success, returns length in bytes of the Reply XDR buffer
  *   that was consumed by the Reply's Write list
  *   %-EMSGSIZE on XDR buffer overflow
  */
-static ssize_t
-svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
-                          struct svc_rdma_send_ctxt *sctxt)
+static ssize_t svc_rdma_encode_write_list(struct svc_rdma_recv_ctxt *rctxt,
+                                         struct svc_rdma_send_ctxt *sctxt)
 {
        struct svc_rdma_chunk *chunk;
        ssize_t len, ret;
 
        len = 0;
-       if (rctxt->rc_write_list) {
-               chunk = pcl_first_chunk(&rctxt->rc_write_pcl);
-               ret = svc_rdma_encode_write_chunk(rctxt->rc_write_list, sctxt,
-                                                 chunk->ch_payload_length);
+       pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
+               ret = svc_rdma_encode_write_chunk(sctxt, chunk);
                if (ret < 0)
                        return ret;
-               len = ret;
+               len += ret;
        }
 
        /* Terminate the Write list */
@@ -493,24 +472,28 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
  * @sctxt: Send context for the RPC Reply
  * @length: size in bytes of the payload in the Reply chunk
  *
- * Assumptions:
- * - Reply can always fit in the client-provided Reply chunk
- *
  * Return values:
  *   On success, returns length in bytes of the Reply XDR buffer
  *   that was consumed by the Reply's Reply chunk
  *   %-EMSGSIZE on XDR buffer overflow
+ *   %-E2BIG if the RPC message is larger than the Reply chunk
  */
 static ssize_t
-svc_rdma_encode_reply_chunk(const struct svc_rdma_recv_ctxt *rctxt,
+svc_rdma_encode_reply_chunk(struct svc_rdma_recv_ctxt *rctxt,
                            struct svc_rdma_send_ctxt *sctxt,
                            unsigned int length)
 {
-       if (!rctxt->rc_reply_chunk)
+       struct svc_rdma_chunk *chunk;
+
+       if (pcl_is_empty(&rctxt->rc_reply_pcl))
                return xdr_stream_encode_item_absent(&sctxt->sc_stream);
 
-       return svc_rdma_encode_write_chunk(rctxt->rc_reply_chunk, sctxt,
-                                          length);
+       chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
+       if (length > chunk->ch_length)
+               return -E2BIG;
+
+       chunk->ch_payload_length = length;
+       return svc_rdma_encode_write_chunk(sctxt, chunk);
 }
 
 static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
@@ -928,7 +911,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
        *p++ = *rdma_argp;
        *p++ = *(rdma_argp + 1);
        *p++ = rdma->sc_fc_credits;
-       *p = rctxt->rc_reply_chunk ? rdma_nomsg : rdma_msg;
+       *p = pcl_is_empty(&rctxt->rc_reply_pcl) ? rdma_msg : rdma_nomsg;
 
        if (svc_rdma_encode_read_list(sctxt) < 0)
                goto err0;