Merge tag 'nfsd-4.12' of git://linux-nfs.org/~bfields/linux
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 10 May 2017 20:29:23 +0000 (13:29 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 10 May 2017 20:29:23 +0000 (13:29 -0700)
Pull nfsd updates from Bruce Fields:
 "Another RDMA update from Chuck Lever, and a bunch of miscellaneous
  bugfixes"

* tag 'nfsd-4.12' of git://linux-nfs.org/~bfields/linux: (26 commits)
  nfsd: Fix up the "supattr_exclcreat" attributes
  nfsd: encoders mustn't use unitialized values in error cases
  nfsd: fix undefined behavior in nfsd4_layout_verify
  lockd: fix lockd shutdown race
  NFSv4: Fix callback server shutdown
  SUNRPC: Refactor svc_set_num_threads()
  NFSv4.x/callback: Create the callback service through svc_create_pooled
  lockd: remove redundant check on block
  svcrdma: Clean out old XDR encoders
  svcrdma: Remove the req_map cache
  svcrdma: Remove unused RDMA Write completion handler
  svcrdma: Reduce size of sge array in struct svc_rdma_op_ctxt
  svcrdma: Clean up RPC-over-RDMA backchannel reply processing
  svcrdma: Report Write/Reply chunk overruns
  svcrdma: Clean up RDMA_ERROR path
  svcrdma: Use rdma_rw API in RPC reply path
  svcrdma: Introduce local rdma_rw API helpers
  svcrdma: Clean up svc_rdma_get_inv_rkey()
  svcrdma: Add helper to save pages under I/O
  svcrdma: Eliminate RPCRDMA_SQ_DEPTH_MULT
  ...

23 files changed:
fs/lockd/svc.c
fs/lockd/svclock.c
fs/nfs/callback.c
fs/nfsd/nfs3xdr.c
fs/nfsd/nfs4proc.c
fs/nfsd/nfs4state.c
fs/nfsd/nfs4xdr.c
fs/nfsd/nfsxdr.c
fs/nfsd/vfs.c
include/linux/sunrpc/rpc_rdma.h
include/linux/sunrpc/svc.h
include/linux/sunrpc/svc_rdma.h
include/uapi/linux/nfsd/cld.h
net/sunrpc/Kconfig
net/sunrpc/svc.c
net/sunrpc/xprtrdma/Makefile
net/sunrpc/xprtrdma/svc_rdma.c
net/sunrpc/xprtrdma/svc_rdma_backchannel.c
net/sunrpc/xprtrdma/svc_rdma_marshal.c
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
net/sunrpc/xprtrdma/svc_rdma_rw.c [new file with mode: 0644]
net/sunrpc/xprtrdma/svc_rdma_sendto.c
net/sunrpc/xprtrdma/svc_rdma_transport.c

index e7c8b9c..5d481e8 100644 (file)
@@ -132,6 +132,8 @@ lockd(void *vrqstp)
 {
        int             err = 0;
        struct svc_rqst *rqstp = vrqstp;
+       struct net *net = &init_net;
+       struct lockd_net *ln = net_generic(net, lockd_net_id);
 
        /* try_to_freeze() is called from svc_recv() */
        set_freezable();
@@ -176,6 +178,8 @@ lockd(void *vrqstp)
        if (nlmsvc_ops)
                nlmsvc_invalidate_all();
        nlm_shutdown_hosts();
+       cancel_delayed_work_sync(&ln->grace_period_end);
+       locks_end_grace(&ln->lockd_manager);
        return 0;
 }
 
@@ -270,8 +274,6 @@ static void lockd_down_net(struct svc_serv *serv, struct net *net)
        if (ln->nlmsvc_users) {
                if (--ln->nlmsvc_users == 0) {
                        nlm_shutdown_hosts_net(net);
-                       cancel_delayed_work_sync(&ln->grace_period_end);
-                       locks_end_grace(&ln->lockd_manager);
                        svc_shutdown_net(serv, net);
                        dprintk("lockd_down_net: per-net data destroyed; net=%p\n", net);
                }
index 5581e02..3507c80 100644 (file)
@@ -870,15 +870,15 @@ nlmsvc_grant_reply(struct nlm_cookie *cookie, __be32 status)
        if (!(block = nlmsvc_find_block(cookie)))
                return;
 
-       if (block) {
-               if (status == nlm_lck_denied_grace_period) {
-                       /* Try again in a couple of seconds */
-                       nlmsvc_insert_block(block, 10 * HZ);
-               } else {
-                       /* Lock is now held by client, or has been rejected.
-                        * In both cases, the block should be removed. */
-                       nlmsvc_unlink_block(block);
-               }
+       if (status == nlm_lck_denied_grace_period) {
+               /* Try again in a couple of seconds */
+               nlmsvc_insert_block(block, 10 * HZ);
+       } else {
+               /*
+                * Lock is now held by client, or has been rejected.
+                * In both cases, the block should be removed.
+                */
+               nlmsvc_unlink_block(block);
        }
        nlmsvc_release_block(block);
 }
index 7737745..73a1f92 100644 (file)
@@ -76,7 +76,10 @@ nfs4_callback_svc(void *vrqstp)
 
        set_freezable();
 
-       while (!kthread_should_stop()) {
+       while (!kthread_freezable_should_stop(NULL)) {
+
+               if (signal_pending(current))
+                       flush_signals(current);
                /*
                 * Listen for a request on the socket
                 */
@@ -85,6 +88,8 @@ nfs4_callback_svc(void *vrqstp)
                        continue;
                svc_process(rqstp);
        }
+       svc_exit_thread(rqstp);
+       module_put_and_exit(0);
        return 0;
 }
 
@@ -103,9 +108,10 @@ nfs41_callback_svc(void *vrqstp)
 
        set_freezable();
 
-       while (!kthread_should_stop()) {
-               if (try_to_freeze())
-                       continue;
+       while (!kthread_freezable_should_stop(NULL)) {
+
+               if (signal_pending(current))
+                       flush_signals(current);
 
                prepare_to_wait(&serv->sv_cb_waitq, &wq, TASK_INTERRUPTIBLE);
                spin_lock_bh(&serv->sv_cb_lock);
@@ -121,11 +127,13 @@ nfs41_callback_svc(void *vrqstp)
                                error);
                } else {
                        spin_unlock_bh(&serv->sv_cb_lock);
-                       schedule();
+                       if (!kthread_should_stop())
+                               schedule();
                        finish_wait(&serv->sv_cb_waitq, &wq);
                }
-               flush_signals(current);
        }
+       svc_exit_thread(rqstp);
+       module_put_and_exit(0);
        return 0;
 }
 
@@ -221,14 +229,14 @@ err_bind:
 static struct svc_serv_ops nfs40_cb_sv_ops = {
        .svo_function           = nfs4_callback_svc,
        .svo_enqueue_xprt       = svc_xprt_do_enqueue,
-       .svo_setup              = svc_set_num_threads,
+       .svo_setup              = svc_set_num_threads_sync,
        .svo_module             = THIS_MODULE,
 };
 #if defined(CONFIG_NFS_V4_1)
 static struct svc_serv_ops nfs41_cb_sv_ops = {
        .svo_function           = nfs41_callback_svc,
        .svo_enqueue_xprt       = svc_xprt_do_enqueue,
-       .svo_setup              = svc_set_num_threads,
+       .svo_setup              = svc_set_num_threads_sync,
        .svo_module             = THIS_MODULE,
 };
 
@@ -280,7 +288,7 @@ static struct svc_serv *nfs_callback_create_svc(int minorversion)
                printk(KERN_WARNING "nfs_callback_create_svc: no kthread, %d users??\n",
                        cb_info->users);
 
-       serv = svc_create(&nfs4_callback_program, NFS4_CALLBACK_BUFSIZE, sv_ops);
+       serv = svc_create_pooled(&nfs4_callback_program, NFS4_CALLBACK_BUFSIZE, sv_ops);
        if (!serv) {
                printk(KERN_ERR "nfs_callback_create_svc: create service failed\n");
                return ERR_PTR(-ENOMEM);
index 4523346..12feac6 100644 (file)
@@ -334,8 +334,11 @@ nfs3svc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
        if (!p)
                return 0;
        p = xdr_decode_hyper(p, &args->offset);
-
        args->count = ntohl(*p++);
+
+       if (!xdr_argsize_check(rqstp, p))
+               return 0;
+
        len = min(args->count, max_blocksize);
 
        /* set up the kvec */
@@ -349,7 +352,7 @@ nfs3svc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
                v++;
        }
        args->vlen = v;
-       return xdr_argsize_check(rqstp, p);
+       return 1;
 }
 
 int
@@ -541,9 +544,11 @@ nfs3svc_decode_readlinkargs(struct svc_rqst *rqstp, __be32 *p,
        p = decode_fh(p, &args->fh);
        if (!p)
                return 0;
+       if (!xdr_argsize_check(rqstp, p))
+               return 0;
        args->buffer = page_address(*(rqstp->rq_next_page++));
 
-       return xdr_argsize_check(rqstp, p);
+       return 1;
 }
 
 int
@@ -569,10 +574,14 @@ nfs3svc_decode_readdirargs(struct svc_rqst *rqstp, __be32 *p,
        args->verf   = p; p += 2;
        args->dircount = ~0;
        args->count  = ntohl(*p++);
+
+       if (!xdr_argsize_check(rqstp, p))
+               return 0;
+
        args->count  = min_t(u32, args->count, PAGE_SIZE);
        args->buffer = page_address(*(rqstp->rq_next_page++));
 
-       return xdr_argsize_check(rqstp, p);
+       return 1;
 }
 
 int
@@ -590,6 +599,9 @@ nfs3svc_decode_readdirplusargs(struct svc_rqst *rqstp, __be32 *p,
        args->dircount = ntohl(*p++);
        args->count    = ntohl(*p++);
 
+       if (!xdr_argsize_check(rqstp, p))
+               return 0;
+
        len = args->count = min(args->count, max_blocksize);
        while (len > 0) {
                struct page *p = *(rqstp->rq_next_page++);
@@ -597,8 +609,7 @@ nfs3svc_decode_readdirplusargs(struct svc_rqst *rqstp, __be32 *p,
                        args->buffer = page_address(p);
                len -= PAGE_SIZE;
        }
-
-       return xdr_argsize_check(rqstp, p);
+       return 1;
 }
 
 int
index d86031b..c453a19 100644 (file)
@@ -1259,7 +1259,8 @@ nfsd4_layout_verify(struct svc_export *exp, unsigned int layout_type)
                return NULL;
        }
 
-       if (!(exp->ex_layout_types & (1 << layout_type))) {
+       if (layout_type >= LAYOUT_TYPE_MAX ||
+           !(exp->ex_layout_types & (1 << layout_type))) {
                dprintk("%s: layout type %d not supported\n",
                        __func__, layout_type);
                return NULL;
index e9ef50a..22002fb 100644 (file)
@@ -1912,28 +1912,15 @@ static void copy_clid(struct nfs4_client *target, struct nfs4_client *source)
        target->cl_clientid.cl_id = source->cl_clientid.cl_id; 
 }
 
-int strdup_if_nonnull(char **target, char *source)
-{
-       if (source) {
-               *target = kstrdup(source, GFP_KERNEL);
-               if (!*target)
-                       return -ENOMEM;
-       } else
-               *target = NULL;
-       return 0;
-}
-
 static int copy_cred(struct svc_cred *target, struct svc_cred *source)
 {
-       int ret;
+       target->cr_principal = kstrdup(source->cr_principal, GFP_KERNEL);
+       target->cr_raw_principal = kstrdup(source->cr_raw_principal,
+                                                               GFP_KERNEL);
+       if ((source->cr_principal && ! target->cr_principal) ||
+           (source->cr_raw_principal && ! target->cr_raw_principal))
+               return -ENOMEM;
 
-       ret = strdup_if_nonnull(&target->cr_principal, source->cr_principal);
-       if (ret)
-               return ret;
-       ret = strdup_if_nonnull(&target->cr_raw_principal,
-                                       source->cr_raw_principal);
-       if (ret)
-               return ret;
        target->cr_flavor = source->cr_flavor;
        target->cr_uid = source->cr_uid;
        target->cr_gid = source->cr_gid;
index 33017d6..26780d5 100644 (file)
@@ -2831,9 +2831,14 @@ out_acl:
        }
 #endif /* CONFIG_NFSD_PNFS */
        if (bmval2 & FATTR4_WORD2_SUPPATTR_EXCLCREAT) {
-               status = nfsd4_encode_bitmap(xdr, NFSD_SUPPATTR_EXCLCREAT_WORD0,
-                                                 NFSD_SUPPATTR_EXCLCREAT_WORD1,
-                                                 NFSD_SUPPATTR_EXCLCREAT_WORD2);
+               u32 supp[3];
+
+               memcpy(supp, nfsd_suppattrs[minorversion], sizeof(supp));
+               supp[0] &= NFSD_SUPPATTR_EXCLCREAT_WORD0;
+               supp[1] &= NFSD_SUPPATTR_EXCLCREAT_WORD1;
+               supp[2] &= NFSD_SUPPATTR_EXCLCREAT_WORD2;
+
+               status = nfsd4_encode_bitmap(xdr, supp[0], supp[1], supp[2]);
                if (status)
                        goto out;
        }
@@ -4119,8 +4124,7 @@ nfsd4_encode_getdeviceinfo(struct nfsd4_compoundres *resp, __be32 nfserr,
                struct nfsd4_getdeviceinfo *gdev)
 {
        struct xdr_stream *xdr = &resp->xdr;
-       const struct nfsd4_layout_ops *ops =
-               nfsd4_layout_ops[gdev->gd_layout_type];
+       const struct nfsd4_layout_ops *ops;
        u32 starting_len = xdr->buf->len, needed_len;
        __be32 *p;
 
@@ -4137,6 +4141,7 @@ nfsd4_encode_getdeviceinfo(struct nfsd4_compoundres *resp, __be32 nfserr,
 
        /* If maxcount is 0 then just update notifications */
        if (gdev->gd_maxcount != 0) {
+               ops = nfsd4_layout_ops[gdev->gd_layout_type];
                nfserr = ops->encode_getdeviceinfo(xdr, gdev);
                if (nfserr) {
                        /*
@@ -4189,8 +4194,7 @@ nfsd4_encode_layoutget(struct nfsd4_compoundres *resp, __be32 nfserr,
                struct nfsd4_layoutget *lgp)
 {
        struct xdr_stream *xdr = &resp->xdr;
-       const struct nfsd4_layout_ops *ops =
-               nfsd4_layout_ops[lgp->lg_layout_type];
+       const struct nfsd4_layout_ops *ops;
        __be32 *p;
 
        dprintk("%s: err %d\n", __func__, nfserr);
@@ -4213,6 +4217,7 @@ nfsd4_encode_layoutget(struct nfsd4_compoundres *resp, __be32 nfserr,
        *p++ = cpu_to_be32(lgp->lg_seg.iomode);
        *p++ = cpu_to_be32(lgp->lg_layout_type);
 
+       ops = nfsd4_layout_ops[lgp->lg_layout_type];
        nfserr = ops->encode_layoutget(xdr, lgp);
 out:
        kfree(lgp->lg_content);
index de07ff6..6a4947a 100644 (file)
@@ -257,6 +257,9 @@ nfssvc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
        len = args->count     = ntohl(*p++);
        p++; /* totalcount - unused */
 
+       if (!xdr_argsize_check(rqstp, p))
+               return 0;
+
        len = min_t(unsigned int, len, NFSSVC_MAXBLKSIZE_V2);
 
        /* set up somewhere to store response.
@@ -272,7 +275,7 @@ nfssvc_decode_readargs(struct svc_rqst *rqstp, __be32 *p,
                v++;
        }
        args->vlen = v;
-       return xdr_argsize_check(rqstp, p);
+       return 1;
 }
 
 int
@@ -362,9 +365,11 @@ nfssvc_decode_readlinkargs(struct svc_rqst *rqstp, __be32 *p, struct nfsd_readli
        p = decode_fh(p, &args->fh);
        if (!p)
                return 0;
+       if (!xdr_argsize_check(rqstp, p))
+               return 0;
        args->buffer = page_address(*(rqstp->rq_next_page++));
 
-       return xdr_argsize_check(rqstp, p);
+       return 1;
 }
 
 int
@@ -402,9 +407,11 @@ nfssvc_decode_readdirargs(struct svc_rqst *rqstp, __be32 *p,
        args->cookie = ntohl(*p++);
        args->count  = ntohl(*p++);
        args->count  = min_t(u32, args->count, PAGE_SIZE);
+       if (!xdr_argsize_check(rqstp, p))
+               return 0;
        args->buffer = page_address(*(rqstp->rq_next_page++));
 
-       return xdr_argsize_check(rqstp, p);
+       return 1;
 }
 
 /*
index 9aaf6ca..2be3295 100644 (file)
@@ -94,6 +94,12 @@ nfsd_cross_mnt(struct svc_rqst *rqstp, struct dentry **dpp,
        err = follow_down(&path);
        if (err < 0)
                goto out;
+       if (path.mnt == exp->ex_path.mnt && path.dentry == dentry &&
+           nfsd_mountpoint(dentry, exp) == 2) {
+               /* This is only a mountpoint in some other namespace */
+               path_put(&path);
+               goto out;
+       }
 
        exp2 = rqst_exp_get_by_name(rqstp, &path);
        if (IS_ERR(exp2)) {
@@ -167,16 +173,26 @@ static int nfsd_lookup_parent(struct svc_rqst *rqstp, struct dentry *dparent, st
 /*
  * For nfsd purposes, we treat V4ROOT exports as though there was an
  * export at *every* directory.
+ * We return:
+ * '1' if this dentry *must* be an export point,
+ * '2' if it might be, if there is really a mount here, and
+ * '0' if there is no chance of an export point here.
  */
 int nfsd_mountpoint(struct dentry *dentry, struct svc_export *exp)
 {
-       if (d_mountpoint(dentry))
+       if (!d_inode(dentry))
+               return 0;
+       if (exp->ex_flags & NFSEXP_V4ROOT)
                return 1;
        if (nfsd4_is_junction(dentry))
                return 1;
-       if (!(exp->ex_flags & NFSEXP_V4ROOT))
-               return 0;
-       return d_inode(dentry) != NULL;
+       if (d_mountpoint(dentry))
+               /*
+                * Might only be a mountpoint in a different namespace,
+                * but we need to check.
+                */
+               return 2;
+       return 0;
 }
 
 __be32
index 245fc59..b7e85b3 100644 (file)
@@ -143,6 +143,9 @@ enum rpcrdma_proc {
 #define rdma_done      cpu_to_be32(RDMA_DONE)
 #define rdma_error     cpu_to_be32(RDMA_ERROR)
 
+#define err_vers       cpu_to_be32(ERR_VERS)
+#define err_chunk      cpu_to_be32(ERR_CHUNK)
+
 /*
  * Private extension to RPC-over-RDMA Version One.
  * Message passed during RDMA-CM connection set-up.
index e770abe..9463102 100644 (file)
@@ -336,8 +336,7 @@ xdr_argsize_check(struct svc_rqst *rqstp, __be32 *p)
 {
        char *cp = (char *)p;
        struct kvec *vec = &rqstp->rq_arg.head[0];
-       return cp >= (char*)vec->iov_base
-               && cp <= (char*)vec->iov_base + vec->iov_len;
+       return cp == (char *)vec->iov_base + vec->iov_len;
 }
 
 static inline int
@@ -474,6 +473,7 @@ void                   svc_pool_map_put(void);
 struct svc_serv *  svc_create_pooled(struct svc_program *, unsigned int,
                        struct svc_serv_ops *);
 int               svc_set_num_threads(struct svc_serv *, struct svc_pool *, int);
+int               svc_set_num_threads_sync(struct svc_serv *, struct svc_pool *, int);
 int               svc_pool_stats_open(struct svc_serv *serv, struct file *file);
 void              svc_destroy(struct svc_serv *);
 void              svc_shutdown_net(struct svc_serv *, struct net *);
index b105f73..f3787d8 100644 (file)
 #include <rdma/rdma_cm.h>
 #define SVCRDMA_DEBUG
 
+/* Default and maximum inline threshold sizes */
+enum {
+       RPCRDMA_DEF_INLINE_THRESH = 4096,
+       RPCRDMA_MAX_INLINE_THRESH = 65536
+};
+
 /* RPC/RDMA parameters and stats */
 extern unsigned int svcrdma_ord;
 extern unsigned int svcrdma_max_requests;
@@ -85,27 +91,11 @@ struct svc_rdma_op_ctxt {
        enum dma_data_direction direction;
        int count;
        unsigned int mapped_sges;
-       struct ib_sge sge[RPCSVC_MAXPAGES];
+       struct ib_send_wr send_wr;
+       struct ib_sge sge[1 + RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE];
        struct page *pages[RPCSVC_MAXPAGES];
 };
 
-/*
- * NFS_ requests are mapped on the client side by the chunk lists in
- * the RPCRDMA header. During the fetching of the RPC from the client
- * and the writing of the reply to the client, the memory in the
- * client and the memory in the server must be mapped as contiguous
- * vaddr/len for access by the hardware. These data strucures keep
- * these mappings.
- *
- * For an RDMA_WRITE, the 'sge' maps the RPC REPLY. For RDMA_READ, the
- * 'sge' in the svc_rdma_req_map maps the server side RPC reply and the
- * 'ch' field maps the read-list of the RPCRDMA header to the 'sge'
- * mapping of the reply.
- */
-struct svc_rdma_chunk_sge {
-       int start;              /* sge no for this chunk */
-       int count;              /* sge count for this chunk */
-};
 struct svc_rdma_fastreg_mr {
        struct ib_mr *mr;
        struct scatterlist *sg;
@@ -114,15 +104,7 @@ struct svc_rdma_fastreg_mr {
        enum dma_data_direction direction;
        struct list_head frmr_list;
 };
-struct svc_rdma_req_map {
-       struct list_head free;
-       unsigned long count;
-       union {
-               struct kvec sge[RPCSVC_MAXPAGES];
-               struct svc_rdma_chunk_sge ch[RPCSVC_MAXPAGES];
-               unsigned long lkey[RPCSVC_MAXPAGES];
-       };
-};
+
 #define RDMACTXT_F_LAST_CTXT   2
 
 #define        SVCRDMA_DEVCAP_FAST_REG         1       /* fast mr registration */
@@ -144,14 +126,15 @@ struct svcxprt_rdma {
        u32                  sc_max_requests;   /* Max requests */
        u32                  sc_max_bc_requests;/* Backward credits */
        int                  sc_max_req_size;   /* Size of each RQ WR buf */
+       u8                   sc_port_num;
 
        struct ib_pd         *sc_pd;
 
        spinlock_t           sc_ctxt_lock;
        struct list_head     sc_ctxts;
        int                  sc_ctxt_used;
-       spinlock_t           sc_map_lock;
-       struct list_head     sc_maps;
+       spinlock_t           sc_rw_ctxt_lock;
+       struct list_head     sc_rw_ctxts;
 
        struct list_head     sc_rq_dto_q;
        spinlock_t           sc_rq_dto_lock;
@@ -181,9 +164,7 @@ struct svcxprt_rdma {
 /* The default ORD value is based on two outstanding full-size writes with a
  * page size of 4k, or 32k * 2 ops / 4k = 16 outstanding RDMA_READ.  */
 #define RPCRDMA_ORD             (64/4)
-#define RPCRDMA_SQ_DEPTH_MULT   8
 #define RPCRDMA_MAX_REQUESTS    32
-#define RPCRDMA_MAX_REQ_SIZE    4096
 
 /* Typical ULP usage of BC requests is NFSv4.1 backchannel. Our
  * current NFSv4.1 implementation supports one backchannel slot.
@@ -201,19 +182,11 @@ static inline void svc_rdma_count_mappings(struct svcxprt_rdma *rdma,
 
 /* svc_rdma_backchannel.c */
 extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
-                                   struct rpcrdma_msg *rmsgp,
+                                   __be32 *rdma_resp,
                                    struct xdr_buf *rcvbuf);
 
 /* svc_rdma_marshal.c */
 extern int svc_rdma_xdr_decode_req(struct xdr_buf *);
-extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
-                                    struct rpcrdma_msg *,
-                                    enum rpcrdma_errcode, __be32 *);
-extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int);
-extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int);
-extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int,
-                                           __be32, __be64, u32);
-extern unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp);
 
 /* svc_rdma_recvfrom.c */
 extern int svc_rdma_recvfrom(struct svc_rqst *);
@@ -224,16 +197,25 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
                                struct svc_rdma_op_ctxt *, int *, u32 *,
                                u32, u32, u64, bool);
 
+/* svc_rdma_rw.c */
+extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
+extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
+                                    __be32 *wr_ch, struct xdr_buf *xdr);
+extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
+                                    __be32 *rp_ch, bool writelist,
+                                    struct xdr_buf *xdr);
+
 /* svc_rdma_sendto.c */
-extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *,
-                           struct svc_rdma_req_map *, bool);
+extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
+                                 struct svc_rdma_op_ctxt *ctxt,
+                                 __be32 *rdma_resp, unsigned int len);
+extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
+                                struct svc_rdma_op_ctxt *ctxt,
+                                int num_sge, u32 inv_rkey);
 extern int svc_rdma_sendto(struct svc_rqst *);
-extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
-                               int);
 
 /* svc_rdma_transport.c */
 extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
-extern void svc_rdma_wc_write(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
 extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
@@ -244,9 +226,6 @@ extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
 extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
 extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
 extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
-extern struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *);
-extern void svc_rdma_put_req_map(struct svcxprt_rdma *,
-                                struct svc_rdma_req_map *);
 extern struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *);
 extern void svc_rdma_put_frmr(struct svcxprt_rdma *,
                              struct svc_rdma_fastreg_mr *);
index f14a9ab..ec26027 100644 (file)
@@ -22,6 +22,8 @@
 #ifndef _NFSD_CLD_H
 #define _NFSD_CLD_H
 
+#include <linux/types.h>
+
 /* latest upcall version available */
 #define CLD_UPCALL_VERSION 1
 
@@ -37,18 +39,18 @@ enum cld_command {
 
 /* representation of long-form NFSv4 client ID */
 struct cld_name {
-       uint16_t        cn_len;                         /* length of cm_id */
+       __u16           cn_len;                         /* length of cm_id */
        unsigned char   cn_id[NFS4_OPAQUE_LIMIT];       /* client-provided */
 } __attribute__((packed));
 
 /* message struct for communication with userspace */
 struct cld_msg {
-       uint8_t         cm_vers;                /* upcall version */
-       uint8_t         cm_cmd;                 /* upcall command */
-       int16_t         cm_status;              /* return code */
-       uint32_t        cm_xid;                 /* transaction id */
+       __u8            cm_vers;                /* upcall version */
+       __u8            cm_cmd;                 /* upcall command */
+       __s16           cm_status;              /* return code */
+       __u32           cm_xid;                 /* transaction id */
        union {
-               int64_t         cm_gracetime;   /* grace period start time */
+               __s64           cm_gracetime;   /* grace period start time */
                struct cld_name cm_name;
        } __attribute__((packed)) cm_u;
 } __attribute__((packed));
index 04ce2c0..ac09ca8 100644 (file)
@@ -52,6 +52,7 @@ config SUNRPC_XPRT_RDMA
        tristate "RPC-over-RDMA transport"
        depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS
        default SUNRPC && INFINIBAND
+       select SG_POOL
        help
          This option allows the NFS client and server to use RDMA
          transports (InfiniBand, iWARP, or RoCE).
index a08aeb5..bc0f5a0 100644 (file)
@@ -702,59 +702,32 @@ found_pool:
        return task;
 }
 
-/*
- * Create or destroy enough new threads to make the number
- * of threads the given number.  If `pool' is non-NULL, applies
- * only to threads in that pool, otherwise round-robins between
- * all pools.  Caller must ensure that mutual exclusion between this and
- * server startup or shutdown.
- *
- * Destroying threads relies on the service threads filling in
- * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
- * has been created using svc_create_pooled().
- *
- * Based on code that used to be in nfsd_svc() but tweaked
- * to be pool-aware.
- */
-int
-svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+/* create new threads */
+static int
+svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 {
        struct svc_rqst *rqstp;
        struct task_struct *task;
        struct svc_pool *chosen_pool;
-       int error = 0;
        unsigned int state = serv->sv_nrthreads-1;
        int node;
 
-       if (pool == NULL) {
-               /* The -1 assumes caller has done a svc_get() */
-               nrservs -= (serv->sv_nrthreads-1);
-       } else {
-               spin_lock_bh(&pool->sp_lock);
-               nrservs -= pool->sp_nrthreads;
-               spin_unlock_bh(&pool->sp_lock);
-       }
-
-       /* create new threads */
-       while (nrservs > 0) {
+       do {
                nrservs--;
                chosen_pool = choose_pool(serv, pool, &state);
 
                node = svc_pool_map_get_node(chosen_pool->sp_id);
                rqstp = svc_prepare_thread(serv, chosen_pool, node);
-               if (IS_ERR(rqstp)) {
-                       error = PTR_ERR(rqstp);
-                       break;
-               }
+               if (IS_ERR(rqstp))
+                       return PTR_ERR(rqstp);
 
                __module_get(serv->sv_ops->svo_module);
                task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
                                              node, "%s", serv->sv_name);
                if (IS_ERR(task)) {
-                       error = PTR_ERR(task);
                        module_put(serv->sv_ops->svo_module);
                        svc_exit_thread(rqstp);
-                       break;
+                       return PTR_ERR(task);
                }
 
                rqstp->rq_task = task;
@@ -763,18 +736,103 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 
                svc_sock_update_bufs(serv);
                wake_up_process(task);
-       }
+       } while (nrservs > 0);
+
+       return 0;
+}
+
+
+/* destroy old threads */
+static int
+svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+       struct task_struct *task;
+       unsigned int state = serv->sv_nrthreads-1;
+
        /* destroy old threads */
-       while (nrservs < 0 &&
-              (task = choose_victim(serv, pool, &state)) != NULL) {
+       do {
+               task = choose_victim(serv, pool, &state);
+               if (task == NULL)
+                       break;
                send_sig(SIGINT, task, 1);
                nrservs++;
+       } while (nrservs < 0);
+
+       return 0;
+}
+
+/*
+ * Create or destroy enough new threads to make the number
+ * of threads the given number.  If `pool' is non-NULL, applies
+ * only to threads in that pool, otherwise round-robins between
+ * all pools.  Caller must ensure that mutual exclusion between this and
+ * server startup or shutdown.
+ *
+ * Destroying threads relies on the service threads filling in
+ * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
+ * has been created using svc_create_pooled().
+ *
+ * Based on code that used to be in nfsd_svc() but tweaked
+ * to be pool-aware.
+ */
+int
+svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+       if (pool == NULL) {
+               /* The -1 assumes caller has done a svc_get() */
+               nrservs -= (serv->sv_nrthreads-1);
+       } else {
+               spin_lock_bh(&pool->sp_lock);
+               nrservs -= pool->sp_nrthreads;
+               spin_unlock_bh(&pool->sp_lock);
        }
 
-       return error;
+       if (nrservs > 0)
+               return svc_start_kthreads(serv, pool, nrservs);
+       if (nrservs < 0)
+               return svc_signal_kthreads(serv, pool, nrservs);
+       return 0;
 }
 EXPORT_SYMBOL_GPL(svc_set_num_threads);
 
+/* destroy old threads */
+static int
+svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+       struct task_struct *task;
+       unsigned int state = serv->sv_nrthreads-1;
+
+       /* destroy old threads */
+       do {
+               task = choose_victim(serv, pool, &state);
+               if (task == NULL)
+                       break;
+               kthread_stop(task);
+               nrservs++;
+       } while (nrservs < 0);
+       return 0;
+}
+
+int
+svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+       if (pool == NULL) {
+               /* The -1 assumes caller has done a svc_get() */
+               nrservs -= (serv->sv_nrthreads-1);
+       } else {
+               spin_lock_bh(&pool->sp_lock);
+               nrservs -= pool->sp_nrthreads;
+               spin_unlock_bh(&pool->sp_lock);
+       }
+
+       if (nrservs > 0)
+               return svc_start_kthreads(serv, pool, nrservs);
+       if (nrservs < 0)
+               return svc_stop_kthreads(serv, pool, nrservs);
+       return 0;
+}
+EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
+
 /*
  * Called from a server thread as it's exiting. Caller must hold the "service
  * mutex" for the service.
index ef19fa4..c1ae814 100644 (file)
@@ -4,5 +4,5 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
        fmr_ops.o frwr_ops.o \
        svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
        svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
-       module.o
+       svc_rdma_rw.o module.o
 rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
index c846ca9..a4a8f69 100644 (file)
@@ -58,9 +58,9 @@ unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
 unsigned int svcrdma_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS;
 static unsigned int min_max_requests = 4;
 static unsigned int max_max_requests = 16384;
-unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE;
-static unsigned int min_max_inline = 4096;
-static unsigned int max_max_inline = 65536;
+unsigned int svcrdma_max_req_size = RPCRDMA_DEF_INLINE_THRESH;
+static unsigned int min_max_inline = RPCRDMA_DEF_INLINE_THRESH;
+static unsigned int max_max_inline = RPCRDMA_MAX_INLINE_THRESH;
 
 atomic_t rdma_stat_recv;
 atomic_t rdma_stat_read;
@@ -247,8 +247,6 @@ int svc_rdma_init(void)
        dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
        dprintk("\tsvcrdma_ord      : %d\n", svcrdma_ord);
        dprintk("\tmax_requests     : %u\n", svcrdma_max_requests);
-       dprintk("\tsq_depth         : %u\n",
-               svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
        dprintk("\tmax_bc_requests  : %u\n", svcrdma_max_bc_requests);
        dprintk("\tmax_inline       : %d\n", svcrdma_max_req_size);
 
index ff1df40..c676ed0 100644 (file)
 
 #undef SVCRDMA_BACKCHANNEL_DEBUG
 
-int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
+/**
+ * svc_rdma_handle_bc_reply - Process incoming backchannel reply
+ * @xprt: controlling backchannel transport
+ * @rdma_resp: pointer to incoming transport header
+ * @rcvbuf: XDR buffer into which to decode the reply
+ *
+ * Returns:
+ *     %0 if @rcvbuf is filled in, xprt_complete_rqst called,
+ *     %-EAGAIN if server should call ->recvfrom again.
+ */
+int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
                             struct xdr_buf *rcvbuf)
 {
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -27,13 +37,13 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
 
        p = (__be32 *)src->iov_base;
        len = src->iov_len;
-       xid = rmsgp->rm_xid;
+       xid = *rdma_resp;
 
 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
        pr_info("%s: xid=%08x, length=%zu\n",
                __func__, be32_to_cpu(xid), len);
        pr_info("%s: RPC/RDMA: %*ph\n",
-               __func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp);
+               __func__, (int)RPCRDMA_HDRLEN_MIN, rdma_resp);
        pr_info("%s:      RPC: %*ph\n",
                __func__, (int)len, p);
 #endif
@@ -53,7 +63,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
                goto out_unlock;
        memcpy(dst->iov_base, p, len);
 
-       credits = be32_to_cpu(rmsgp->rm_credit);
+       credits = be32_to_cpup(rdma_resp + 2);
        if (credits == 0)
                credits = 1;    /* don't deadlock */
        else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
@@ -90,9 +100,9 @@ out_notfound:
  * Caller holds the connection's mutex and has already marshaled
  * the RPC/RDMA request.
  *
- * This is similar to svc_rdma_reply, but takes an rpc_rqst
- * instead, does not support chunks, and avoids blocking memory
- * allocation.
+ * This is similar to svc_rdma_send_reply_msg, but takes a struct
+ * rpc_rqst instead, does not support chunks, and avoids blocking
+ * memory allocation.
  *
  * XXX: There is still an opportunity to block in svc_rdma_send()
  * if there are no SQ entries to post the Send. This may occur if
@@ -101,59 +111,36 @@ out_notfound:
 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
                              struct rpc_rqst *rqst)
 {
-       struct xdr_buf *sndbuf = &rqst->rq_snd_buf;
        struct svc_rdma_op_ctxt *ctxt;
-       struct svc_rdma_req_map *vec;
-       struct ib_send_wr send_wr;
        int ret;
 
-       vec = svc_rdma_get_req_map(rdma);
-       ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false);
-       if (ret)
+       ctxt = svc_rdma_get_context(rdma);
+
+       /* rpcrdma_bc_send_request builds the transport header and
+        * the backchannel RPC message in the same buffer. Thus only
+        * one SGE is needed to send both.
+        */
+       ret = svc_rdma_map_reply_hdr(rdma, ctxt, rqst->rq_buffer,
+                                    rqst->rq_snd_buf.len);
+       if (ret < 0)
                goto out_err;
 
        ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
        if (ret)
                goto out_err;
 
-       ctxt = svc_rdma_get_context(rdma);
-       ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
-       ctxt->count = 1;
-
-       ctxt->direction = DMA_TO_DEVICE;
-       ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
-       ctxt->sge[0].length = sndbuf->len;
-       ctxt->sge[0].addr =
-           ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0,
-                           sndbuf->len, DMA_TO_DEVICE);
-       if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) {
-               ret = -EIO;
-               goto out_unmap;
-       }
-       svc_rdma_count_mappings(rdma, ctxt);
-
-       memset(&send_wr, 0, sizeof(send_wr));
-       ctxt->cqe.done = svc_rdma_wc_send;
-       send_wr.wr_cqe = &ctxt->cqe;
-       send_wr.sg_list = ctxt->sge;
-       send_wr.num_sge = 1;
-       send_wr.opcode = IB_WR_SEND;
-       send_wr.send_flags = IB_SEND_SIGNALED;
-
-       ret = svc_rdma_send(rdma, &send_wr);
-       if (ret) {
-               ret = -EIO;
+       ret = svc_rdma_post_send_wr(rdma, ctxt, 1, 0);
+       if (ret)
                goto out_unmap;
-       }
 
 out_err:
-       svc_rdma_put_req_map(rdma, vec);
        dprintk("svcrdma: %s returns %d\n", __func__, ret);
        return ret;
 
 out_unmap:
        svc_rdma_unmap_dma(ctxt);
        svc_rdma_put_context(ctxt, 1);
+       ret = -EIO;
        goto out_err;
 }
 
index 1c4aabf..bdcf7d8 100644 (file)
@@ -166,92 +166,3 @@ out_inval:
        dprintk("svcrdma: failed to parse transport header\n");
        return -EINVAL;
 }
-
-int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
-                             struct rpcrdma_msg *rmsgp,
-                             enum rpcrdma_errcode err, __be32 *va)
-{
-       __be32 *startp = va;
-
-       *va++ = rmsgp->rm_xid;
-       *va++ = rmsgp->rm_vers;
-       *va++ = xprt->sc_fc_credits;
-       *va++ = rdma_error;
-       *va++ = cpu_to_be32(err);
-       if (err == ERR_VERS) {
-               *va++ = rpcrdma_version;
-               *va++ = rpcrdma_version;
-       }
-
-       return (int)((unsigned long)va - (unsigned long)startp);
-}
-
-/**
- * svc_rdma_xdr_get_reply_hdr_length - Get length of Reply transport header
- * @rdma_resp: buffer containing Reply transport header
- *
- * Returns length of transport header, in bytes.
- */
-unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp)
-{
-       unsigned int nsegs;
-       __be32 *p;
-
-       p = rdma_resp;
-
-       /* RPC-over-RDMA V1 replies never have a Read list. */
-       p += rpcrdma_fixed_maxsz + 1;
-
-       /* Skip Write list. */
-       while (*p++ != xdr_zero) {
-               nsegs = be32_to_cpup(p++);
-               p += nsegs * rpcrdma_segment_maxsz;
-       }
-
-       /* Skip Reply chunk. */
-       if (*p++ != xdr_zero) {
-               nsegs = be32_to_cpup(p++);
-               p += nsegs * rpcrdma_segment_maxsz;
-       }
-
-       return (unsigned long)p - (unsigned long)rdma_resp;
-}
-
-void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
-{
-       struct rpcrdma_write_array *ary;
-
-       /* no read-list */
-       rmsgp->rm_body.rm_chunks[0] = xdr_zero;
-
-       /* write-array discrim */
-       ary = (struct rpcrdma_write_array *)
-               &rmsgp->rm_body.rm_chunks[1];
-       ary->wc_discrim = xdr_one;
-       ary->wc_nchunks = cpu_to_be32(chunks);
-
-       /* write-list terminator */
-       ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
-
-       /* reply-array discriminator */
-       ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
-}
-
-void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
-                                int chunks)
-{
-       ary->wc_discrim = xdr_one;
-       ary->wc_nchunks = cpu_to_be32(chunks);
-}
-
-void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
-                                    int chunk_no,
-                                    __be32 rs_handle,
-                                    __be64 rs_offset,
-                                    u32 write_len)
-{
-       struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
-       seg->rs_handle = rs_handle;
-       seg->rs_offset = rs_offset;
-       seg->rs_length = cpu_to_be32(write_len);
-}
index f7b2daf..27a99bf 100644 (file)
@@ -558,33 +558,85 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
        rqstp->rq_arg.buflen = head->arg.buflen;
 }
 
+static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
+                               __be32 *rdma_argp, int status)
+{
+       struct svc_rdma_op_ctxt *ctxt;
+       __be32 *p, *err_msgp;
+       unsigned int length;
+       struct page *page;
+       int ret;
+
+       ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
+       if (ret)
+               return;
+
+       page = alloc_page(GFP_KERNEL);
+       if (!page)
+               return;
+       err_msgp = page_address(page);
+
+       p = err_msgp;
+       *p++ = *rdma_argp;
+       *p++ = *(rdma_argp + 1);
+       *p++ = xprt->sc_fc_credits;
+       *p++ = rdma_error;
+       if (status == -EPROTONOSUPPORT) {
+               *p++ = err_vers;
+               *p++ = rpcrdma_version;
+               *p++ = rpcrdma_version;
+       } else {
+               *p++ = err_chunk;
+       }
+       length = (unsigned long)p - (unsigned long)err_msgp;
+
+       /* Map transport header; no RPC message payload */
+       ctxt = svc_rdma_get_context(xprt);
+       ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
+       if (ret) {
+               dprintk("svcrdma: Error %d mapping send for protocol error\n",
+                       ret);
+               return;
+       }
+
+       ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0);
+       if (ret) {
+               dprintk("svcrdma: Error %d posting send for protocol error\n",
+                       ret);
+               svc_rdma_unmap_dma(ctxt);
+               svc_rdma_put_context(ctxt, 1);
+       }
+}
+
 /* By convention, backchannel calls arrive via rdma_msg type
  * messages, and never populate the chunk lists. This makes
  * the RPC/RDMA header small and fixed in size, so it is
  * straightforward to check the RPC header's direction field.
  */
-static bool
-svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp)
+static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
+                                         __be32 *rdma_resp)
 {
-       __be32 *p = (__be32 *)rmsgp;
+       __be32 *p;
 
        if (!xprt->xpt_bc_xprt)
                return false;
 
-       if (rmsgp->rm_type != rdma_msg)
+       p = rdma_resp + 3;
+       if (*p++ != rdma_msg)
                return false;
-       if (rmsgp->rm_body.rm_chunks[0] != xdr_zero)
+
+       if (*p++ != xdr_zero)
                return false;
-       if (rmsgp->rm_body.rm_chunks[1] != xdr_zero)
+       if (*p++ != xdr_zero)
                return false;
-       if (rmsgp->rm_body.rm_chunks[2] != xdr_zero)
+       if (*p++ != xdr_zero)
                return false;
 
-       /* sanity */
-       if (p[7] != rmsgp->rm_xid)
+       /* XID sanity */
+       if (*p++ != *rdma_resp)
                return false;
        /* call direction */
-       if (p[8] == cpu_to_be32(RPC_CALL))
+       if (*p == cpu_to_be32(RPC_CALL))
                return false;
 
        return true;
@@ -650,8 +702,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
                goto out_drop;
        rqstp->rq_xprt_hlen = ret;
 
-       if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
-               ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
+       if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) {
+               ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt,
+                                              &rmsgp->rm_xid,
                                               &rqstp->rq_arg);
                svc_rdma_put_context(ctxt, 0);
                if (ret)
@@ -686,7 +739,7 @@ complete:
        return ret;
 
 out_err:
-       svc_rdma_send_error(rdma_xprt, rmsgp, ret);
+       svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret);
        svc_rdma_put_context(ctxt, 0);
        return 0;
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
new file mode 100644 (file)
index 0000000..0cf6202
--- /dev/null
@@ -0,0 +1,512 @@
+/*
+ * Copyright (c) 2016 Oracle.  All rights reserved.
+ *
+ * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
+ */
+
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+#include <linux/sunrpc/debug.h>
+
+#include <rdma/rw.h>
+
+#define RPCDBG_FACILITY        RPCDBG_SVCXPRT
+
+/* Each R/W context contains state for one chain of RDMA Read or
+ * Write Work Requests.
+ *
+ * Each WR chain handles a single contiguous server-side buffer,
+ * because scatterlist entries after the first have to start on
+ * page alignment. xdr_buf iovecs cannot guarantee alignment.
+ *
+ * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
+ * from a client may contain a unique R_key, so each WR chain moves
+ * up to one segment at a time.
+ *
+ * The scatterlist makes this data structure over 4KB in size. To
+ * make it less likely to fail, and to handle the allocation for
+ * smaller I/O requests without disabling bottom-halves, these
+ * contexts are created on demand, but cached and reused until the
+ * controlling svcxprt_rdma is destroyed.
+ */
+struct svc_rdma_rw_ctxt {
+       struct list_head        rw_list;
+       struct rdma_rw_ctx      rw_ctx;
+       int                     rw_nents;
+       struct sg_table         rw_sg_table;
+       struct scatterlist      rw_first_sgl[0];
+};
+
+static inline struct svc_rdma_rw_ctxt *
+svc_rdma_next_ctxt(struct list_head *list)
+{
+       return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
+                                       rw_list);
+}
+
+static struct svc_rdma_rw_ctxt *
+svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
+{
+       struct svc_rdma_rw_ctxt *ctxt;
+
+       spin_lock(&rdma->sc_rw_ctxt_lock);
+
+       ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts);
+       if (ctxt) {
+               list_del(&ctxt->rw_list);
+               spin_unlock(&rdma->sc_rw_ctxt_lock);
+       } else {
+               spin_unlock(&rdma->sc_rw_ctxt_lock);
+               ctxt = kmalloc(sizeof(*ctxt) +
+                              SG_CHUNK_SIZE * sizeof(struct scatterlist),
+                              GFP_KERNEL);
+               if (!ctxt)
+                       goto out;
+               INIT_LIST_HEAD(&ctxt->rw_list);
+       }
+
+       ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
+       if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
+                                  ctxt->rw_sg_table.sgl)) {
+               kfree(ctxt);
+               ctxt = NULL;
+       }
+out:
+       return ctxt;
+}
+
+static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
+                                struct svc_rdma_rw_ctxt *ctxt)
+{
+       sg_free_table_chained(&ctxt->rw_sg_table, true);
+
+       spin_lock(&rdma->sc_rw_ctxt_lock);
+       list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
+       spin_unlock(&rdma->sc_rw_ctxt_lock);
+}
+
+/**
+ * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
+ * @rdma: transport about to be destroyed
+ *
+ */
+void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
+{
+       struct svc_rdma_rw_ctxt *ctxt;
+
+       while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) {
+               list_del(&ctxt->rw_list);
+               kfree(ctxt);
+       }
+}
+
+/* A chunk context tracks all I/O for moving one Read or Write
+ * chunk. This is a a set of rdma_rw's that handle data movement
+ * for all segments of one chunk.
+ *
+ * These are small, acquired with a single allocator call, and
+ * no more than one is needed per chunk. They are allocated on
+ * demand, and not cached.
+ */
+struct svc_rdma_chunk_ctxt {
+       struct ib_cqe           cc_cqe;
+       struct svcxprt_rdma     *cc_rdma;
+       struct list_head        cc_rwctxts;
+       int                     cc_sqecount;
+       enum dma_data_direction cc_dir;
+};
+
+static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
+                            struct svc_rdma_chunk_ctxt *cc,
+                            enum dma_data_direction dir)
+{
+       cc->cc_rdma = rdma;
+       svc_xprt_get(&rdma->sc_xprt);
+
+       INIT_LIST_HEAD(&cc->cc_rwctxts);
+       cc->cc_sqecount = 0;
+       cc->cc_dir = dir;
+}
+
+static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)
+{
+       struct svcxprt_rdma *rdma = cc->cc_rdma;
+       struct svc_rdma_rw_ctxt *ctxt;
+
+       while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
+               list_del(&ctxt->rw_list);
+
+               rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
+                                   rdma->sc_port_num, ctxt->rw_sg_table.sgl,
+                                   ctxt->rw_nents, cc->cc_dir);
+               svc_rdma_put_rw_ctxt(rdma, ctxt);
+       }
+       svc_xprt_put(&rdma->sc_xprt);
+}
+
+/* State for sending a Write or Reply chunk.
+ *  - Tracks progress of writing one chunk over all its segments
+ *  - Stores arguments for the SGL constructor functions
+ */
+struct svc_rdma_write_info {
+       /* write state of this chunk */
+       unsigned int            wi_seg_off;
+       unsigned int            wi_seg_no;
+       unsigned int            wi_nsegs;
+       __be32                  *wi_segs;
+
+       /* SGL constructor arguments */
+       struct xdr_buf          *wi_xdr;
+       unsigned char           *wi_base;
+       unsigned int            wi_next_off;
+
+       struct svc_rdma_chunk_ctxt      wi_cc;
+};
+
+static struct svc_rdma_write_info *
+svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
+{
+       struct svc_rdma_write_info *info;
+
+       info = kmalloc(sizeof(*info), GFP_KERNEL);
+       if (!info)
+               return info;
+
+       info->wi_seg_off = 0;
+       info->wi_seg_no = 0;
+       info->wi_nsegs = be32_to_cpup(++chunk);
+       info->wi_segs = ++chunk;
+       svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE);
+       return info;
+}
+
+static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
+{
+       svc_rdma_cc_release(&info->wi_cc);
+       kfree(info);
+}
+
+/**
+ * svc_rdma_write_done - Write chunk completion
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion
+ *
+ * Pages under I/O are freed by a subsequent Send completion.
+ */
+static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+       struct ib_cqe *cqe = wc->wr_cqe;
+       struct svc_rdma_chunk_ctxt *cc =
+                       container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
+       struct svcxprt_rdma *rdma = cc->cc_rdma;
+       struct svc_rdma_write_info *info =
+                       container_of(cc, struct svc_rdma_write_info, wi_cc);
+
+       atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+       wake_up(&rdma->sc_send_wait);
+
+       if (unlikely(wc->status != IB_WC_SUCCESS)) {
+               set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+               if (wc->status != IB_WC_WR_FLUSH_ERR)
+                       pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
+                              ib_wc_status_msg(wc->status),
+                              wc->status, wc->vendor_err);
+       }
+
+       svc_rdma_write_info_free(info);
+}
+
+/* This function sleeps when the transport's Send Queue is congested.
+ *
+ * Assumptions:
+ * - If ib_post_send() succeeds, only one completion is expected,
+ *   even if one or more WRs are flushed. This is true when posting
+ *   an rdma_rw_ctx or when posting a single signaled WR.
+ */
+static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
+{
+       struct svcxprt_rdma *rdma = cc->cc_rdma;
+       struct svc_xprt *xprt = &rdma->sc_xprt;
+       struct ib_send_wr *first_wr, *bad_wr;
+       struct list_head *tmp;
+       struct ib_cqe *cqe;
+       int ret;
+
+       first_wr = NULL;
+       cqe = &cc->cc_cqe;
+       list_for_each(tmp, &cc->cc_rwctxts) {
+               struct svc_rdma_rw_ctxt *ctxt;
+
+               ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
+               first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
+                                          rdma->sc_port_num, cqe, first_wr);
+               cqe = NULL;
+       }
+
+       do {
+               if (atomic_sub_return(cc->cc_sqecount,
+                                     &rdma->sc_sq_avail) > 0) {
+                       ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
+                       if (ret)
+                               break;
+                       return 0;
+               }
+
+               atomic_inc(&rdma_stat_sq_starve);
+               atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+               wait_event(rdma->sc_send_wait,
+                          atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
+       } while (1);
+
+       pr_err("svcrdma: ib_post_send failed (%d)\n", ret);
+       set_bit(XPT_CLOSE, &xprt->xpt_flags);
+
+       /* If even one was posted, there will be a completion. */
+       if (bad_wr != first_wr)
+               return 0;
+
+       atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+       wake_up(&rdma->sc_send_wait);
+       return -ENOTCONN;
+}
+
+/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
+ */
+static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
+                              unsigned int len,
+                              struct svc_rdma_rw_ctxt *ctxt)
+{
+       struct scatterlist *sg = ctxt->rw_sg_table.sgl;
+
+       sg_set_buf(&sg[0], info->wi_base, len);
+       info->wi_base += len;
+
+       ctxt->rw_nents = 1;
+}
+
+/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
+ */
+static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
+                                   unsigned int remaining,
+                                   struct svc_rdma_rw_ctxt *ctxt)
+{
+       unsigned int sge_no, sge_bytes, page_off, page_no;
+       struct xdr_buf *xdr = info->wi_xdr;
+       struct scatterlist *sg;
+       struct page **page;
+
+       page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK;
+       page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT;
+       page = xdr->pages + page_no;
+       info->wi_next_off += remaining;
+       sg = ctxt->rw_sg_table.sgl;
+       sge_no = 0;
+       do {
+               sge_bytes = min_t(unsigned int, remaining,
+                                 PAGE_SIZE - page_off);
+               sg_set_page(sg, *page, sge_bytes, page_off);
+
+               remaining -= sge_bytes;
+               sg = sg_next(sg);
+               page_off = 0;
+               sge_no++;
+               page++;
+       } while (remaining);
+
+       ctxt->rw_nents = sge_no;
+}
+
+/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
+ * an RPC Reply.
+ */
+static int
+svc_rdma_build_writes(struct svc_rdma_write_info *info,
+                     void (*constructor)(struct svc_rdma_write_info *info,
+                                         unsigned int len,
+                                         struct svc_rdma_rw_ctxt *ctxt),
+                     unsigned int remaining)
+{
+       struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
+       struct svcxprt_rdma *rdma = cc->cc_rdma;
+       struct svc_rdma_rw_ctxt *ctxt;
+       __be32 *seg;
+       int ret;
+
+       cc->cc_cqe.done = svc_rdma_write_done;
+       seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
+       do {
+               unsigned int write_len;
+               u32 seg_length, seg_handle;
+               u64 seg_offset;
+
+               if (info->wi_seg_no >= info->wi_nsegs)
+                       goto out_overflow;
+
+               seg_handle = be32_to_cpup(seg);
+               seg_length = be32_to_cpup(seg + 1);
+               xdr_decode_hyper(seg + 2, &seg_offset);
+               seg_offset += info->wi_seg_off;
+
+               write_len = min(remaining, seg_length - info->wi_seg_off);
+               ctxt = svc_rdma_get_rw_ctxt(rdma,
+                                           (write_len >> PAGE_SHIFT) + 2);
+               if (!ctxt)
+                       goto out_noctx;
+
+               constructor(info, write_len, ctxt);
+               ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp,
+                                      rdma->sc_port_num, ctxt->rw_sg_table.sgl,
+                                      ctxt->rw_nents, 0, seg_offset,
+                                      seg_handle, DMA_TO_DEVICE);
+               if (ret < 0)
+                       goto out_initerr;
+
+               list_add(&ctxt->rw_list, &cc->cc_rwctxts);
+               cc->cc_sqecount += ret;
+               if (write_len == seg_length - info->wi_seg_off) {
+                       seg += 4;
+                       info->wi_seg_no++;
+                       info->wi_seg_off = 0;
+               } else {
+                       info->wi_seg_off += write_len;
+               }
+               remaining -= write_len;
+       } while (remaining);
+
+       return 0;
+
+out_overflow:
+       dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
+               info->wi_nsegs);
+       return -E2BIG;
+
+out_noctx:
+       dprintk("svcrdma: no R/W ctxs available\n");
+       return -ENOMEM;
+
+out_initerr:
+       svc_rdma_put_rw_ctxt(rdma, ctxt);
+       pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
+       return -EIO;
+}
+
+/* Send one of an xdr_buf's kvecs by itself. To send a Reply
+ * chunk, the whole RPC Reply is written back to the client.
+ * This function writes either the head or tail of the xdr_buf
+ * containing the Reply.
+ */
+static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
+                                 struct kvec *vec)
+{
+       info->wi_base = vec->iov_base;
+       return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
+                                    vec->iov_len);
+}
+
+/* Send an xdr_buf's page list by itself. A Write chunk is
+ * just the page list. a Reply chunk is the head, page list,
+ * and tail. This function is shared between the two types
+ * of chunk.
+ */
+static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
+                                     struct xdr_buf *xdr)
+{
+       info->wi_xdr = xdr;
+       info->wi_next_off = 0;
+       return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
+                                    xdr->page_len);
+}
+
+/**
+ * svc_rdma_send_write_chunk - Write all segments in a Write chunk
+ * @rdma: controlling RDMA transport
+ * @wr_ch: Write chunk provided by client
+ * @xdr: xdr_buf containing the data payload
+ *
+ * Returns a non-negative number of bytes the chunk consumed, or
+ *     %-E2BIG if the payload was larger than the Write chunk,
+ *     %-ENOMEM if rdma_rw context pool was exhausted,
+ *     %-ENOTCONN if posting failed (connection is lost),
+ *     %-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ */
+int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
+                             struct xdr_buf *xdr)
+{
+       struct svc_rdma_write_info *info;
+       int ret;
+
+       if (!xdr->page_len)
+               return 0;
+
+       info = svc_rdma_write_info_alloc(rdma, wr_ch);
+       if (!info)
+               return -ENOMEM;
+
+       ret = svc_rdma_send_xdr_pagelist(info, xdr);
+       if (ret < 0)
+               goto out_err;
+
+       ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+       if (ret < 0)
+               goto out_err;
+       return xdr->page_len;
+
+out_err:
+       svc_rdma_write_info_free(info);
+       return ret;
+}
+
+/**
+ * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
+ * @rdma: controlling RDMA transport
+ * @rp_ch: Reply chunk provided by client
+ * @writelist: true if client provided a Write list
+ * @xdr: xdr_buf containing an RPC Reply
+ *
+ * Returns a non-negative number of bytes the chunk consumed, or
+ *     %-E2BIG if the payload was larger than the Reply chunk,
+ *     %-ENOMEM if rdma_rw context pool was exhausted,
+ *     %-ENOTCONN if posting failed (connection is lost),
+ *     %-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ */
+int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
+                             bool writelist, struct xdr_buf *xdr)
+{
+       struct svc_rdma_write_info *info;
+       int consumed, ret;
+
+       info = svc_rdma_write_info_alloc(rdma, rp_ch);
+       if (!info)
+               return -ENOMEM;
+
+       ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
+       if (ret < 0)
+               goto out_err;
+       consumed = xdr->head[0].iov_len;
+
+       /* Send the page list in the Reply chunk only if the
+        * client did not provide Write chunks.
+        */
+       if (!writelist && xdr->page_len) {
+               ret = svc_rdma_send_xdr_pagelist(info, xdr);
+               if (ret < 0)
+                       goto out_err;
+               consumed += xdr->page_len;
+       }
+
+       if (xdr->tail[0].iov_len) {
+               ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
+               if (ret < 0)
+                       goto out_err;
+               consumed += xdr->tail[0].iov_len;
+       }
+
+       ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+       if (ret < 0)
+               goto out_err;
+       return consumed;
+
+out_err:
+       svc_rdma_write_info_free(info);
+       return ret;
+}
index 515221b..1736337 100644 (file)
@@ -1,4 +1,5 @@
 /*
+ * Copyright (c) 2016 Oracle. All rights reserved.
  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  *
  * Author: Tom Tucker <tom@opengridcomputing.com>
  */
 
+/* Operation
+ *
+ * The main entry point is svc_rdma_sendto. This is called by the
+ * RPC server when an RPC Reply is ready to be transmitted to a client.
+ *
+ * The passed-in svc_rqst contains a struct xdr_buf which holds an
+ * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
+ * transport header, post all Write WRs needed for this Reply, then post
+ * a Send WR conveying the transport header and the RPC message itself to
+ * the client.
+ *
+ * svc_rdma_sendto must fully transmit the Reply before returning, as
+ * the svc_rqst will be recycled as soon as sendto returns. Remaining
+ * resources referred to by the svc_rqst are also recycled at that time.
+ * Therefore any resources that must remain longer must be detached
+ * from the svc_rqst and released later.
+ *
+ * Page Management
+ *
+ * The I/O that performs Reply transmission is asynchronous, and may
+ * complete well after sendto returns. Thus pages under I/O must be
+ * removed from the svc_rqst before sendto returns.
+ *
+ * The logic here depends on Send Queue and completion ordering. Since
+ * the Send WR is always posted last, it will always complete last. Thus
+ * when it completes, it is guaranteed that all previous Write WRs have
+ * also completed.
+ *
+ * Write WRs are constructed and posted. Each Write segment gets its own
+ * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
+ * DMA-unmap the pages under I/O for that Write segment. The Write
+ * completion handler does not release any pages.
+ *
+ * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
+ * The ownership of all of the Reply's pages are transferred into that
+ * ctxt, the Send WR is posted, and sendto returns.
+ *
+ * The svc_rdma_op_ctxt is presented when the Send WR completes. The
+ * Send completion handler finally releases the Reply's pages.
+ *
+ * This mechanism also assumes that completions on the transport's Send
+ * Completion Queue do not run in parallel. Otherwise a Write completion
+ * and Send completion running at the same time could release pages that
+ * are still DMA-mapped.
+ *
+ * Error Handling
+ *
+ * - If the Send WR is posted successfully, it will either complete
+ *   successfully, or get flushed. Either way, the Send completion
+ *   handler releases the Reply's pages.
+ * - If the Send WR cannot be not posted, the forward path releases
+ *   the Reply's pages.
+ *
+ * This handles the case, without the use of page reference counting,
+ * where two different Write segments send portions of the same page.
+ */
+
 #include <linux/sunrpc/debug.h>
 #include <linux/sunrpc/rpc_rdma.h>
 #include <linux/spinlock.h>
@@ -55,113 +113,141 @@ static u32 xdr_padsize(u32 len)
        return (len & 3) ? (4 - (len & 3)) : 0;
 }
 
-int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
-                    struct xdr_buf *xdr,
-                    struct svc_rdma_req_map *vec,
-                    bool write_chunk_present)
+/* Returns length of transport header, in bytes.
+ */
+static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
 {
-       int sge_no;
-       u32 sge_bytes;
-       u32 page_bytes;
-       u32 page_off;
-       int page_no;
-
-       if (xdr->len !=
-           (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
-               pr_err("svcrdma: %s: XDR buffer length error\n", __func__);
-               return -EIO;
-       }
+       unsigned int nsegs;
+       __be32 *p;
 
-       /* Skip the first sge, this is for the RPCRDMA header */
-       sge_no = 1;
+       p = rdma_resp;
+
+       /* RPC-over-RDMA V1 replies never have a Read list. */
+       p += rpcrdma_fixed_maxsz + 1;
 
-       /* Head SGE */
-       vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
-       vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
-       sge_no++;
-
-       /* pages SGE */
-       page_no = 0;
-       page_bytes = xdr->page_len;
-       page_off = xdr->page_base;
-       while (page_bytes) {
-               vec->sge[sge_no].iov_base =
-                       page_address(xdr->pages[page_no]) + page_off;
-               sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
-               page_bytes -= sge_bytes;
-               vec->sge[sge_no].iov_len = sge_bytes;
-
-               sge_no++;
-               page_no++;
-               page_off = 0; /* reset for next time through loop */
+       /* Skip Write list. */
+       while (*p++ != xdr_zero) {
+               nsegs = be32_to_cpup(p++);
+               p += nsegs * rpcrdma_segment_maxsz;
        }
 
-       /* Tail SGE */
-       if (xdr->tail[0].iov_len) {
-               unsigned char *base = xdr->tail[0].iov_base;
-               size_t len = xdr->tail[0].iov_len;
-               u32 xdr_pad = xdr_padsize(xdr->page_len);
+       /* Skip Reply chunk. */
+       if (*p++ != xdr_zero) {
+               nsegs = be32_to_cpup(p++);
+               p += nsegs * rpcrdma_segment_maxsz;
+       }
 
-               if (write_chunk_present && xdr_pad) {
-                       base += xdr_pad;
-                       len -= xdr_pad;
-               }
+       return (unsigned long)p - (unsigned long)rdma_resp;
+}
 
-               if (len) {
-                       vec->sge[sge_no].iov_base = base;
-                       vec->sge[sge_no].iov_len = len;
-                       sge_no++;
+/* One Write chunk is copied from Call transport header to Reply
+ * transport header. Each segment's length field is updated to
+ * reflect number of bytes consumed in the segment.
+ *
+ * Returns number of segments in this chunk.
+ */
+static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+                                          unsigned int remaining)
+{
+       unsigned int i, nsegs;
+       u32 seg_len;
+
+       /* Write list discriminator */
+       *dst++ = *src++;
+
+       /* number of segments in this chunk */
+       nsegs = be32_to_cpup(src);
+       *dst++ = *src++;
+
+       for (i = nsegs; i; i--) {
+               /* segment's RDMA handle */
+               *dst++ = *src++;
+
+               /* bytes returned in this segment */
+               seg_len = be32_to_cpu(*src);
+               if (remaining >= seg_len) {
+                       /* entire segment was consumed */
+                       *dst = *src;
+                       remaining -= seg_len;
+               } else {
+                       /* segment only partly filled */
+                       *dst = cpu_to_be32(remaining);
+                       remaining = 0;
                }
-       }
+               dst++; src++;
 
-       dprintk("svcrdma: %s: sge_no %d page_no %d "
-               "page_base %u page_len %u head_len %zu tail_len %zu\n",
-               __func__, sge_no, page_no, xdr->page_base, xdr->page_len,
-               xdr->head[0].iov_len, xdr->tail[0].iov_len);
+               /* segment's RDMA offset */
+               *dst++ = *src++;
+               *dst++ = *src++;
+       }
 
-       vec->count = sge_no;
-       return 0;
+       return nsegs;
 }
 
-static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
-                             struct xdr_buf *xdr,
-                             u32 xdr_off, size_t len, int dir)
+/* The client provided a Write list in the Call message. Fill in
+ * the segments in the first Write chunk in the Reply's transport
+ * header with the number of bytes consumed in each segment.
+ * Remaining chunks are returned unused.
+ *
+ * Assumptions:
+ *  - Client has provided only one Write chunk
+ */
+static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
+                                          unsigned int consumed)
 {
-       struct page *page;
-       dma_addr_t dma_addr;
-       if (xdr_off < xdr->head[0].iov_len) {
-               /* This offset is in the head */
-               xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
-               page = virt_to_page(xdr->head[0].iov_base);
-       } else {
-               xdr_off -= xdr->head[0].iov_len;
-               if (xdr_off < xdr->page_len) {
-                       /* This offset is in the page list */
-                       xdr_off += xdr->page_base;
-                       page = xdr->pages[xdr_off >> PAGE_SHIFT];
-                       xdr_off &= ~PAGE_MASK;
-               } else {
-                       /* This offset is in the tail */
-                       xdr_off -= xdr->page_len;
-                       xdr_off += (unsigned long)
-                               xdr->tail[0].iov_base & ~PAGE_MASK;
-                       page = virt_to_page(xdr->tail[0].iov_base);
-               }
+       unsigned int nsegs;
+       __be32 *p, *q;
+
+       /* RPC-over-RDMA V1 replies never have a Read list. */
+       p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+
+       q = wr_ch;
+       while (*q != xdr_zero) {
+               nsegs = xdr_encode_write_chunk(p, q, consumed);
+               q += 2 + nsegs * rpcrdma_segment_maxsz;
+               p += 2 + nsegs * rpcrdma_segment_maxsz;
+               consumed = 0;
        }
-       dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
-                                  min_t(size_t, PAGE_SIZE, len), dir);
-       return dma_addr;
+
+       /* Terminate Write list */
+       *p++ = xdr_zero;
+
+       /* Reply chunk discriminator; may be replaced later */
+       *p = xdr_zero;
+}
+
+/* The client provided a Reply chunk in the Call message. Fill in
+ * the segments in the Reply chunk in the Reply message with the
+ * number of bytes consumed in each segment.
+ *
+ * Assumptions:
+ * - Reply can always fit in the provided Reply chunk
+ */
+static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
+                                           unsigned int consumed)
+{
+       __be32 *p;
+
+       /* Find the Reply chunk in the Reply's xprt header.
+        * RPC-over-RDMA V1 replies never have a Read list.
+        */
+       p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+
+       /* Skip past Write list */
+       while (*p++ != xdr_zero)
+               p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
+
+       xdr_encode_write_chunk(p, rp_ch, consumed);
 }
 
 /* Parse the RPC Call's transport header.
  */
-static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
-                                     struct rpcrdma_write_array **write,
-                                     struct rpcrdma_write_array **reply)
+static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
+                                     __be32 **write, __be32 **reply)
 {
        __be32 *p;
 
-       p = (__be32 *)&rmsgp->rm_body.rm_chunks[0];
+       p = rdma_argp + rpcrdma_fixed_maxsz;
 
        /* Read list */
        while (*p++ != xdr_zero)
@@ -169,7 +255,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
 
        /* Write list */
        if (*p != xdr_zero) {
-               *write = (struct rpcrdma_write_array *)p;
+               *write = p;
                while (*p++ != xdr_zero)
                        p += 1 + be32_to_cpu(*p) * 4;
        } else {
@@ -179,7 +265,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
 
        /* Reply chunk */
        if (*p != xdr_zero)
-               *reply = (struct rpcrdma_write_array *)p;
+               *reply = p;
        else
                *reply = NULL;
 }
@@ -189,360 +275,321 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
  * Invalidate, and responder chooses one rkey to invalidate.
  *
  * Find a candidate rkey to invalidate when sending a reply.  Picks the
- * first rkey it finds in the chunks lists.
+ * first R_key it finds in the chunk lists.
  *
  * Returns zero if RPC's chunk lists are empty.
  */
-static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp,
-                                struct rpcrdma_write_array *wr_ary,
-                                struct rpcrdma_write_array *rp_ary)
+static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
+                                __be32 *wr_lst, __be32 *rp_ch)
 {
-       struct rpcrdma_read_chunk *rd_ary;
-       struct rpcrdma_segment *arg_ch;
+       __be32 *p;
 
-       rd_ary = (struct rpcrdma_read_chunk *)&rdma_argp->rm_body.rm_chunks[0];
-       if (rd_ary->rc_discrim != xdr_zero)
-               return be32_to_cpu(rd_ary->rc_target.rs_handle);
+       p = rdma_argp + rpcrdma_fixed_maxsz;
+       if (*p != xdr_zero)
+               p += 2;
+       else if (wr_lst && be32_to_cpup(wr_lst + 1))
+               p = wr_lst + 2;
+       else if (rp_ch && be32_to_cpup(rp_ch + 1))
+               p = rp_ch + 2;
+       else
+               return 0;
+       return be32_to_cpup(p);
+}
 
-       if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) {
-               arg_ch = &wr_ary->wc_array[0].wc_target;
-               return be32_to_cpu(arg_ch->rs_handle);
-       }
+/* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
+ * is used during completion to DMA-unmap this memory, and
+ * it uses ib_dma_unmap_page() exclusively.
+ */
+static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
+                               struct svc_rdma_op_ctxt *ctxt,
+                               unsigned int sge_no,
+                               unsigned char *base,
+                               unsigned int len)
+{
+       unsigned long offset = (unsigned long)base & ~PAGE_MASK;
+       struct ib_device *dev = rdma->sc_cm_id->device;
+       dma_addr_t dma_addr;
 
-       if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) {
-               arg_ch = &rp_ary->wc_array[0].wc_target;
-               return be32_to_cpu(arg_ch->rs_handle);
-       }
+       dma_addr = ib_dma_map_page(dev, virt_to_page(base),
+                                  offset, len, DMA_TO_DEVICE);
+       if (ib_dma_mapping_error(dev, dma_addr))
+               return -EIO;
 
+       ctxt->sge[sge_no].addr = dma_addr;
+       ctxt->sge[sge_no].length = len;
+       ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
+       svc_rdma_count_mappings(rdma, ctxt);
        return 0;
 }
 
-/* Assumptions:
- * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
- */
-static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
-                     u32 rmr, u64 to,
-                     u32 xdr_off, int write_len,
-                     struct svc_rdma_req_map *vec)
+static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
+                                struct svc_rdma_op_ctxt *ctxt,
+                                unsigned int sge_no,
+                                struct page *page,
+                                unsigned int offset,
+                                unsigned int len)
 {
-       struct ib_rdma_wr write_wr;
-       struct ib_sge *sge;
-       int xdr_sge_no;
-       int sge_no;
-       int sge_bytes;
-       int sge_off;
-       int bc;
-       struct svc_rdma_op_ctxt *ctxt;
+       struct ib_device *dev = rdma->sc_cm_id->device;
+       dma_addr_t dma_addr;
 
-       if (vec->count > RPCSVC_MAXPAGES) {
-               pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
+       dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
+       if (ib_dma_mapping_error(dev, dma_addr))
                return -EIO;
-       }
 
-       dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
-               "write_len=%d, vec->sge=%p, vec->count=%lu\n",
-               rmr, (unsigned long long)to, xdr_off,
-               write_len, vec->sge, vec->count);
+       ctxt->sge[sge_no].addr = dma_addr;
+       ctxt->sge[sge_no].length = len;
+       ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
+       svc_rdma_count_mappings(rdma, ctxt);
+       return 0;
+}
 
-       ctxt = svc_rdma_get_context(xprt);
+/**
+ * svc_rdma_map_reply_hdr - DMA map the transport header buffer
+ * @rdma: controlling transport
+ * @ctxt: op_ctxt for the Send WR
+ * @rdma_resp: buffer containing transport header
+ * @len: length of transport header
+ *
+ * Returns:
+ *     %0 if the header is DMA mapped,
+ *     %-EIO if DMA mapping failed.
+ */
+int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
+                          struct svc_rdma_op_ctxt *ctxt,
+                          __be32 *rdma_resp,
+                          unsigned int len)
+{
        ctxt->direction = DMA_TO_DEVICE;
-       sge = ctxt->sge;
-
-       /* Find the SGE associated with xdr_off */
-       for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
-            xdr_sge_no++) {
-               if (vec->sge[xdr_sge_no].iov_len > bc)
-                       break;
-               bc -= vec->sge[xdr_sge_no].iov_len;
-       }
-
-       sge_off = bc;
-       bc = write_len;
-       sge_no = 0;
-
-       /* Copy the remaining SGE */
-       while (bc != 0) {
-               sge_bytes = min_t(size_t,
-                         bc, vec->sge[xdr_sge_no].iov_len-sge_off);
-               sge[sge_no].length = sge_bytes;
-               sge[sge_no].addr =
-                       dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
-                                   sge_bytes, DMA_TO_DEVICE);
-               xdr_off += sge_bytes;
-               if (ib_dma_mapping_error(xprt->sc_cm_id->device,
-                                        sge[sge_no].addr))
-                       goto err;
-               svc_rdma_count_mappings(xprt, ctxt);
-               sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
-               ctxt->count++;
-               sge_off = 0;
-               sge_no++;
-               xdr_sge_no++;
-               if (xdr_sge_no > vec->count) {
-                       pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
-                       goto err;
-               }
-               bc -= sge_bytes;
-               if (sge_no == xprt->sc_max_sge)
-                       break;
-       }
-
-       /* Prepare WRITE WR */
-       memset(&write_wr, 0, sizeof write_wr);
-       ctxt->cqe.done = svc_rdma_wc_write;
-       write_wr.wr.wr_cqe = &ctxt->cqe;
-       write_wr.wr.sg_list = &sge[0];
-       write_wr.wr.num_sge = sge_no;
-       write_wr.wr.opcode = IB_WR_RDMA_WRITE;
-       write_wr.wr.send_flags = IB_SEND_SIGNALED;
-       write_wr.rkey = rmr;
-       write_wr.remote_addr = to;
-
-       /* Post It */
-       atomic_inc(&rdma_stat_write);
-       if (svc_rdma_send(xprt, &write_wr.wr))
-               goto err;
-       return write_len - bc;
- err:
-       svc_rdma_unmap_dma(ctxt);
-       svc_rdma_put_context(ctxt, 0);
-       return -EIO;
+       ctxt->pages[0] = virt_to_page(rdma_resp);
+       ctxt->count = 1;
+       return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
 }
 
-noinline
-static int send_write_chunks(struct svcxprt_rdma *xprt,
-                            struct rpcrdma_write_array *wr_ary,
-                            struct rpcrdma_msg *rdma_resp,
-                            struct svc_rqst *rqstp,
-                            struct svc_rdma_req_map *vec)
+/* Load the xdr_buf into the ctxt's sge array, and DMA map each
+ * element as it is added.
+ *
+ * Returns the number of sge elements loaded on success, or
+ * a negative errno on failure.
+ */
+static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
+                                 struct svc_rdma_op_ctxt *ctxt,
+                                 struct xdr_buf *xdr, __be32 *wr_lst)
 {
-       u32 xfer_len = rqstp->rq_res.page_len;
-       int write_len;
-       u32 xdr_off;
-       int chunk_off;
-       int chunk_no;
-       int nchunks;
-       struct rpcrdma_write_array *res_ary;
+       unsigned int len, sge_no, remaining, page_off;
+       struct page **ppages;
+       unsigned char *base;
+       u32 xdr_pad;
        int ret;
 
-       res_ary = (struct rpcrdma_write_array *)
-               &rdma_resp->rm_body.rm_chunks[1];
-
-       /* Write chunks start at the pagelist */
-       nchunks = be32_to_cpu(wr_ary->wc_nchunks);
-       for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
-            xfer_len && chunk_no < nchunks;
-            chunk_no++) {
-               struct rpcrdma_segment *arg_ch;
-               u64 rs_offset;
-
-               arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
-               write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
-
-               /* Prepare the response chunk given the length actually
-                * written */
-               xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
-               svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
-                                               arg_ch->rs_handle,
-                                               arg_ch->rs_offset,
-                                               write_len);
-               chunk_off = 0;
-               while (write_len) {
-                       ret = send_write(xprt, rqstp,
-                                        be32_to_cpu(arg_ch->rs_handle),
-                                        rs_offset + chunk_off,
-                                        xdr_off,
-                                        write_len,
-                                        vec);
-                       if (ret <= 0)
-                               goto out_err;
-                       chunk_off += ret;
-                       xdr_off += ret;
-                       xfer_len -= ret;
-                       write_len -= ret;
+       sge_no = 1;
+
+       ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++,
+                                  xdr->head[0].iov_base,
+                                  xdr->head[0].iov_len);
+       if (ret < 0)
+               return ret;
+
+       /* If a Write chunk is present, the xdr_buf's page list
+        * is not included inline. However the Upper Layer may
+        * have added XDR padding in the tail buffer, and that
+        * should not be included inline.
+        */
+       if (wr_lst) {
+               base = xdr->tail[0].iov_base;
+               len = xdr->tail[0].iov_len;
+               xdr_pad = xdr_padsize(xdr->page_len);
+
+               if (len && xdr_pad) {
+                       base += xdr_pad;
+                       len -= xdr_pad;
                }
+
+               goto tail;
+       }
+
+       ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+       page_off = xdr->page_base & ~PAGE_MASK;
+       remaining = xdr->page_len;
+       while (remaining) {
+               len = min_t(u32, PAGE_SIZE - page_off, remaining);
+
+               ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++,
+                                           *ppages++, page_off, len);
+               if (ret < 0)
+                       return ret;
+
+               remaining -= len;
+               page_off = 0;
        }
-       /* Update the req with the number of chunks actually used */
-       svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
 
-       return rqstp->rq_res.page_len;
+       base = xdr->tail[0].iov_base;
+       len = xdr->tail[0].iov_len;
+tail:
+       if (len) {
+               ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len);
+               if (ret < 0)
+                       return ret;
+       }
 
-out_err:
-       pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
-       return -EIO;
+       return sge_no - 1;
 }
 
-noinline
-static int send_reply_chunks(struct svcxprt_rdma *xprt,
-                            struct rpcrdma_write_array *rp_ary,
-                            struct rpcrdma_msg *rdma_resp,
-                            struct svc_rqst *rqstp,
-                            struct svc_rdma_req_map *vec)
+/* The svc_rqst and all resources it owns are released as soon as
+ * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
+ * so they are released by the Send completion handler.
+ */
+static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
+                                  struct svc_rdma_op_ctxt *ctxt)
 {
-       u32 xfer_len = rqstp->rq_res.len;
-       int write_len;
-       u32 xdr_off;
-       int chunk_no;
-       int chunk_off;
-       int nchunks;
-       struct rpcrdma_segment *ch;
-       struct rpcrdma_write_array *res_ary;
-       int ret;
+       int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
 
-       /* XXX: need to fix when reply lists occur with read-list and or
-        * write-list */
-       res_ary = (struct rpcrdma_write_array *)
-               &rdma_resp->rm_body.rm_chunks[2];
-
-       /* xdr offset starts at RPC message */
-       nchunks = be32_to_cpu(rp_ary->wc_nchunks);
-       for (xdr_off = 0, chunk_no = 0;
-            xfer_len && chunk_no < nchunks;
-            chunk_no++) {
-               u64 rs_offset;
-               ch = &rp_ary->wc_array[chunk_no].wc_target;
-               write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
-
-               /* Prepare the reply chunk given the length actually
-                * written */
-               xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
-               svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
-                                               ch->rs_handle, ch->rs_offset,
-                                               write_len);
-               chunk_off = 0;
-               while (write_len) {
-                       ret = send_write(xprt, rqstp,
-                                        be32_to_cpu(ch->rs_handle),
-                                        rs_offset + chunk_off,
-                                        xdr_off,
-                                        write_len,
-                                        vec);
-                       if (ret <= 0)
-                               goto out_err;
-                       chunk_off += ret;
-                       xdr_off += ret;
-                       xfer_len -= ret;
-                       write_len -= ret;
-               }
+       ctxt->count += pages;
+       for (i = 0; i < pages; i++) {
+               ctxt->pages[i + 1] = rqstp->rq_respages[i];
+               rqstp->rq_respages[i] = NULL;
        }
-       /* Update the req with the number of chunks actually used */
-       svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
+       rqstp->rq_next_page = rqstp->rq_respages + 1;
+}
 
-       return rqstp->rq_res.len;
+/**
+ * svc_rdma_post_send_wr - Set up and post one Send Work Request
+ * @rdma: controlling transport
+ * @ctxt: op_ctxt for transmitting the Send WR
+ * @num_sge: number of SGEs to send
+ * @inv_rkey: R_key argument to Send With Invalidate, or zero
+ *
+ * Returns:
+ *     %0 if the Send* was posted successfully,
+ *     %-ENOTCONN if the connection was lost or dropped,
+ *     %-EINVAL if there was a problem with the Send we built,
+ *     %-ENOMEM if ib_post_send failed.
+ */
+int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
+                         struct svc_rdma_op_ctxt *ctxt, int num_sge,
+                         u32 inv_rkey)
+{
+       struct ib_send_wr *send_wr = &ctxt->send_wr;
 
-out_err:
-       pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
-       return -EIO;
+       dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge);
+
+       send_wr->next = NULL;
+       ctxt->cqe.done = svc_rdma_wc_send;
+       send_wr->wr_cqe = &ctxt->cqe;
+       send_wr->sg_list = ctxt->sge;
+       send_wr->num_sge = num_sge;
+       send_wr->send_flags = IB_SEND_SIGNALED;
+       if (inv_rkey) {
+               send_wr->opcode = IB_WR_SEND_WITH_INV;
+               send_wr->ex.invalidate_rkey = inv_rkey;
+       } else {
+               send_wr->opcode = IB_WR_SEND;
+       }
+
+       return svc_rdma_send(rdma, send_wr);
 }
 
-/* This function prepares the portion of the RPCRDMA message to be
- * sent in the RDMA_SEND. This function is called after data sent via
- * RDMA has already been transmitted. There are three cases:
- * - The RPCRDMA header, RPC header, and payload are all sent in a
- *   single RDMA_SEND. This is the "inline" case.
- * - The RPCRDMA header and some portion of the RPC header and data
- *   are sent via this RDMA_SEND and another portion of the data is
- *   sent via RDMA.
- * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
- *   header and data are all transmitted via RDMA.
- * In all three cases, this function prepares the RPCRDMA header in
- * sge[0], the 'type' parameter indicates the type to place in the
- * RPCRDMA header, and the 'byte_count' field indicates how much of
- * the XDR to include in this RDMA_SEND. NB: The offset of the payload
- * to send is zero in the XDR.
+/* Prepare the portion of the RPC Reply that will be transmitted
+ * via RDMA Send. The RPC-over-RDMA transport header is prepared
+ * in sge[0], and the RPC xdr_buf is prepared in following sges.
+ *
+ * Depending on whether a Write list or Reply chunk is present,
+ * the server may send all, a portion of, or none of the xdr_buf.
+ * In the latter case, only the transport header (sge[0]) is
+ * transmitted.
+ *
+ * RDMA Send is the last step of transmitting an RPC reply. Pages
+ * involved in the earlier RDMA Writes are here transferred out
+ * of the rqstp and into the ctxt's page array. These pages are
+ * DMA unmapped by each Write completion, but the subsequent Send
+ * completion finally releases these pages.
+ *
+ * Assumptions:
+ * - The Reply's transport header will never be larger than a page.
  */
-static int send_reply(struct svcxprt_rdma *rdma,
-                     struct svc_rqst *rqstp,
-                     struct page *page,
-                     struct rpcrdma_msg *rdma_resp,
-                     struct svc_rdma_req_map *vec,
-                     int byte_count,
-                     u32 inv_rkey)
+static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
+                                  __be32 *rdma_argp, __be32 *rdma_resp,
+                                  struct svc_rqst *rqstp,
+                                  __be32 *wr_lst, __be32 *rp_ch)
 {
        struct svc_rdma_op_ctxt *ctxt;
-       struct ib_send_wr send_wr;
-       u32 xdr_off;
-       int sge_no;
-       int sge_bytes;
-       int page_no;
-       int pages;
-       int ret = -EIO;
-
-       /* Prepare the context */
+       u32 inv_rkey;
+       int ret;
+
+       dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n",
+               (rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"),
+               rqstp->rq_res.head[0].iov_len,
+               rqstp->rq_res.page_len,
+               rqstp->rq_res.tail[0].iov_len);
+
        ctxt = svc_rdma_get_context(rdma);
-       ctxt->direction = DMA_TO_DEVICE;
-       ctxt->pages[0] = page;
-       ctxt->count = 1;
 
-       /* Prepare the SGE for the RPCRDMA Header */
-       ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
-       ctxt->sge[0].length =
-           svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp);
-       ctxt->sge[0].addr =
-           ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
-                           ctxt->sge[0].length, DMA_TO_DEVICE);
-       if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
+       ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
+                                    svc_rdma_reply_hdr_len(rdma_resp));
+       if (ret < 0)
                goto err;
-       svc_rdma_count_mappings(rdma, ctxt);
-
-       ctxt->direction = DMA_TO_DEVICE;
 
-       /* Map the payload indicated by 'byte_count' */
-       xdr_off = 0;
-       for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
-               sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
-               byte_count -= sge_bytes;
-               ctxt->sge[sge_no].addr =
-                       dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
-                                   sge_bytes, DMA_TO_DEVICE);
-               xdr_off += sge_bytes;
-               if (ib_dma_mapping_error(rdma->sc_cm_id->device,
-                                        ctxt->sge[sge_no].addr))
+       if (!rp_ch) {
+               ret = svc_rdma_map_reply_msg(rdma, ctxt,
+                                            &rqstp->rq_res, wr_lst);
+               if (ret < 0)
                        goto err;
-               svc_rdma_count_mappings(rdma, ctxt);
-               ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
-               ctxt->sge[sge_no].length = sge_bytes;
        }
-       if (byte_count != 0) {
-               pr_err("svcrdma: Could not map %d bytes\n", byte_count);
+
+       svc_rdma_save_io_pages(rqstp, ctxt);
+
+       inv_rkey = 0;
+       if (rdma->sc_snd_w_inv)
+               inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
+       ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey);
+       if (ret)
                goto err;
-       }
 
-       /* Save all respages in the ctxt and remove them from the
-        * respages array. They are our pages until the I/O
-        * completes.
+       return 0;
+
+err:
+       pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
+       svc_rdma_unmap_dma(ctxt);
+       svc_rdma_put_context(ctxt, 1);
+       return ret;
+}
+
+/* Given the client-provided Write and Reply chunks, the server was not
+ * able to form a complete reply. Return an RDMA_ERROR message so the
+ * client can retire this RPC transaction. As above, the Send completion
+ * routine releases payload pages that were part of a previous RDMA Write.
+ *
+ * Remote Invalidation is skipped for simplicity.
+ */
+static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
+                                  __be32 *rdma_resp, struct svc_rqst *rqstp)
+{
+       struct svc_rdma_op_ctxt *ctxt;
+       __be32 *p;
+       int ret;
+
+       ctxt = svc_rdma_get_context(rdma);
+
+       /* Replace the original transport header with an
+        * RDMA_ERROR response. XID etc are preserved.
         */
-       pages = rqstp->rq_next_page - rqstp->rq_respages;
-       for (page_no = 0; page_no < pages; page_no++) {
-               ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
-               ctxt->count++;
-               rqstp->rq_respages[page_no] = NULL;
-       }
-       rqstp->rq_next_page = rqstp->rq_respages + 1;
+       p = rdma_resp + 3;
+       *p++ = rdma_error;
+       *p   = err_chunk;
 
-       if (sge_no > rdma->sc_max_sge) {
-               pr_err("svcrdma: Too many sges (%d)\n", sge_no);
+       ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20);
+       if (ret < 0)
                goto err;
-       }
-       memset(&send_wr, 0, sizeof send_wr);
-       ctxt->cqe.done = svc_rdma_wc_send;
-       send_wr.wr_cqe = &ctxt->cqe;
-       send_wr.sg_list = ctxt->sge;
-       send_wr.num_sge = sge_no;
-       if (inv_rkey) {
-               send_wr.opcode = IB_WR_SEND_WITH_INV;
-               send_wr.ex.invalidate_rkey = inv_rkey;
-       } else
-               send_wr.opcode = IB_WR_SEND;
-       send_wr.send_flags =  IB_SEND_SIGNALED;
 
-       ret = svc_rdma_send(rdma, &send_wr);
+       svc_rdma_save_io_pages(rqstp, ctxt);
+
+       ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, 0);
        if (ret)
                goto err;
 
        return 0;
 
- err:
+err:
+       pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
        svc_rdma_unmap_dma(ctxt);
        svc_rdma_put_context(ctxt, 1);
        return ret;
@@ -552,39 +599,36 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
 {
 }
 
+/**
+ * svc_rdma_sendto - Transmit an RPC reply
+ * @rqstp: processed RPC request, reply XDR already in ::rq_res
+ *
+ * Any resources still associated with @rqstp are released upon return.
+ * If no reply message was possible, the connection is closed.
+ *
+ * Returns:
+ *     %0 if an RPC reply has been successfully posted,
+ *     %-ENOMEM if a resource shortage occurred (connection is lost),
+ *     %-ENOTCONN if posting failed (connection is lost).
+ */
 int svc_rdma_sendto(struct svc_rqst *rqstp)
 {
        struct svc_xprt *xprt = rqstp->rq_xprt;
        struct svcxprt_rdma *rdma =
                container_of(xprt, struct svcxprt_rdma, sc_xprt);
-       struct rpcrdma_msg *rdma_argp;
-       struct rpcrdma_msg *rdma_resp;
-       struct rpcrdma_write_array *wr_ary, *rp_ary;
-       int ret;
-       int inline_bytes;
+       __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
+       struct xdr_buf *xdr = &rqstp->rq_res;
        struct page *res_page;
-       struct svc_rdma_req_map *vec;
-       u32 inv_rkey;
-       __be32 *p;
-
-       dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
+       int ret;
 
-       /* Get the RDMA request header. The receive logic always
-        * places this at the start of page 0.
+       /* Find the call's chunk lists to decide how to send the reply.
+        * Receive places the Call's xprt header at the start of page 0.
         */
        rdma_argp = page_address(rqstp->rq_pages[0]);
-       svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary);
-
-       inv_rkey = 0;
-       if (rdma->sc_snd_w_inv)
-               inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary);
+       svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
 
-       /* Build an req vec for the XDR */
-       vec = svc_rdma_get_req_map(rdma);
-       ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
-       if (ret)
-               goto err0;
-       inline_bytes = rqstp->rq_res.len;
+       dprintk("svcrdma: preparing response for XID 0x%08x\n",
+               be32_to_cpup(rdma_argp));
 
        /* Create the RDMA response header. xprt->xpt_mutex,
         * acquired in svc_send(), serializes RPC replies. The
@@ -598,115 +642,57 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
                goto err0;
        rdma_resp = page_address(res_page);
 
-       p = &rdma_resp->rm_xid;
-       *p++ = rdma_argp->rm_xid;
-       *p++ = rdma_argp->rm_vers;
+       p = rdma_resp;
+       *p++ = *rdma_argp;
+       *p++ = *(rdma_argp + 1);
        *p++ = rdma->sc_fc_credits;
-       *p++ = rp_ary ? rdma_nomsg : rdma_msg;
+       *p++ = rp_ch ? rdma_nomsg : rdma_msg;
 
        /* Start with empty chunks */
        *p++ = xdr_zero;
        *p++ = xdr_zero;
        *p   = xdr_zero;
 
-       /* Send any write-chunk data and build resp write-list */
-       if (wr_ary) {
-               ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
+       if (wr_lst) {
+               /* XXX: Presume the client sent only one Write chunk */
+               ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
                if (ret < 0)
-                       goto err1;
-               inline_bytes -= ret + xdr_padsize(ret);
+                       goto err2;
+               svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
        }
-
-       /* Send any reply-list data and update resp reply-list */
-       if (rp_ary) {
-               ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
+       if (rp_ch) {
+               ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
                if (ret < 0)
-                       goto err1;
-               inline_bytes -= ret;
+                       goto err2;
+               svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
        }
 
-       /* Post a fresh Receive buffer _before_ sending the reply */
        ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
        if (ret)
                goto err1;
-
-       ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec,
-                        inline_bytes, inv_rkey);
+       ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp,
+                                     wr_lst, rp_ch);
        if (ret < 0)
                goto err0;
+       return 0;
 
-       svc_rdma_put_req_map(rdma, vec);
-       dprintk("svcrdma: send_reply returns %d\n", ret);
-       return ret;
+ err2:
+       if (ret != -E2BIG)
+               goto err1;
+
+       ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
+       if (ret)
+               goto err1;
+       ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp);
+       if (ret < 0)
+               goto err0;
+       return 0;
 
  err1:
        put_page(res_page);
  err0:
-       svc_rdma_put_req_map(rdma, vec);
        pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
               ret);
-       set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+       set_bit(XPT_CLOSE, &xprt->xpt_flags);
        return -ENOTCONN;
 }
-
-void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
-                        int status)
-{
-       struct ib_send_wr err_wr;
-       struct page *p;
-       struct svc_rdma_op_ctxt *ctxt;
-       enum rpcrdma_errcode err;
-       __be32 *va;
-       int length;
-       int ret;
-
-       ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
-       if (ret)
-               return;
-
-       p = alloc_page(GFP_KERNEL);
-       if (!p)
-               return;
-       va = page_address(p);
-
-       /* XDR encode an error reply */
-       err = ERR_CHUNK;
-       if (status == -EPROTONOSUPPORT)
-               err = ERR_VERS;
-       length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
-
-       ctxt = svc_rdma_get_context(xprt);
-       ctxt->direction = DMA_TO_DEVICE;
-       ctxt->count = 1;
-       ctxt->pages[0] = p;
-
-       /* Prepare SGE for local address */
-       ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
-       ctxt->sge[0].length = length;
-       ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
-                                           p, 0, length, DMA_TO_DEVICE);
-       if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
-               dprintk("svcrdma: Error mapping buffer for protocol error\n");
-               svc_rdma_put_context(ctxt, 1);
-               return;
-       }
-       svc_rdma_count_mappings(xprt, ctxt);
-
-       /* Prepare SEND WR */
-       memset(&err_wr, 0, sizeof(err_wr));
-       ctxt->cqe.done = svc_rdma_wc_send;
-       err_wr.wr_cqe = &ctxt->cqe;
-       err_wr.sg_list = ctxt->sge;
-       err_wr.num_sge = 1;
-       err_wr.opcode = IB_WR_SEND;
-       err_wr.send_flags = IB_SEND_SIGNALED;
-
-       /* Post It */
-       ret = svc_rdma_send(xprt, &err_wr);
-       if (ret) {
-               dprintk("svcrdma: Error %d posting send for protocol error\n",
-                       ret);
-               svc_rdma_unmap_dma(ctxt);
-               svc_rdma_put_context(ctxt, 1);
-       }
-}
index fc8f14c..a9d9cb1 100644 (file)
@@ -272,85 +272,6 @@ static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
        }
 }
 
-static struct svc_rdma_req_map *alloc_req_map(gfp_t flags)
-{
-       struct svc_rdma_req_map *map;
-
-       map = kmalloc(sizeof(*map), flags);
-       if (map)
-               INIT_LIST_HEAD(&map->free);
-       return map;
-}
-
-static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt)
-{
-       unsigned int i;
-
-       /* One for each receive buffer on this connection. */
-       i = xprt->sc_max_requests;
-
-       while (i--) {
-               struct svc_rdma_req_map *map;
-
-               map = alloc_req_map(GFP_KERNEL);
-               if (!map) {
-                       dprintk("svcrdma: No memory for request map\n");
-                       return false;
-               }
-               list_add(&map->free, &xprt->sc_maps);
-       }
-       return true;
-}
-
-struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt)
-{
-       struct svc_rdma_req_map *map = NULL;
-
-       spin_lock(&xprt->sc_map_lock);
-       if (list_empty(&xprt->sc_maps))
-               goto out_empty;
-
-       map = list_first_entry(&xprt->sc_maps,
-                              struct svc_rdma_req_map, free);
-       list_del_init(&map->free);
-       spin_unlock(&xprt->sc_map_lock);
-
-out:
-       map->count = 0;
-       return map;
-
-out_empty:
-       spin_unlock(&xprt->sc_map_lock);
-
-       /* Pre-allocation amount was incorrect */
-       map = alloc_req_map(GFP_NOIO);
-       if (map)
-               goto out;
-
-       WARN_ONCE(1, "svcrdma: empty request map list?\n");
-       return NULL;
-}
-
-void svc_rdma_put_req_map(struct svcxprt_rdma *xprt,
-                         struct svc_rdma_req_map *map)
-{
-       spin_lock(&xprt->sc_map_lock);
-       list_add(&map->free, &xprt->sc_maps);
-       spin_unlock(&xprt->sc_map_lock);
-}
-
-static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
-{
-       while (!list_empty(&xprt->sc_maps)) {
-               struct svc_rdma_req_map *map;
-
-               map = list_first_entry(&xprt->sc_maps,
-                                      struct svc_rdma_req_map, free);
-               list_del(&map->free);
-               kfree(map);
-       }
-}
-
 /* QP event handler */
 static void qp_event_handler(struct ib_event *event, void *context)
 {
@@ -473,24 +394,6 @@ void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
        svc_rdma_put_context(ctxt, 1);
 }
 
-/**
- * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
- * @cq:        completion queue
- * @wc:        completed WR
- *
- */
-void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
-{
-       struct ib_cqe *cqe = wc->wr_cqe;
-       struct svc_rdma_op_ctxt *ctxt;
-
-       svc_rdma_send_wc_common_put(cq, wc, "write");
-
-       ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
-       svc_rdma_unmap_dma(ctxt);
-       svc_rdma_put_context(ctxt, 0);
-}
-
 /**
  * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
  * @cq:        completion queue
@@ -561,14 +464,14 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
        INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
        INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
        INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
-       INIT_LIST_HEAD(&cma_xprt->sc_maps);
+       INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
        init_waitqueue_head(&cma_xprt->sc_send_wait);
 
        spin_lock_init(&cma_xprt->sc_lock);
        spin_lock_init(&cma_xprt->sc_rq_dto_lock);
        spin_lock_init(&cma_xprt->sc_frmr_q_lock);
        spin_lock_init(&cma_xprt->sc_ctxt_lock);
-       spin_lock_init(&cma_xprt->sc_map_lock);
+       spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
 
        /*
         * Note that this implies that the underlying transport support
@@ -999,6 +902,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
                newxprt, newxprt->sc_cm_id);
 
        dev = newxprt->sc_cm_id->device;
+       newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
 
        /* Qualify the transport resource defaults with the
         * capabilities of this particular device */
@@ -1014,13 +918,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
                                            svcrdma_max_bc_requests);
        newxprt->sc_rq_depth = newxprt->sc_max_requests +
                               newxprt->sc_max_bc_requests;
-       newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth;
+       newxprt->sc_sq_depth = newxprt->sc_rq_depth;
        atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
 
        if (!svc_rdma_prealloc_ctxts(newxprt))
                goto errout;
-       if (!svc_rdma_prealloc_maps(newxprt))
-               goto errout;
 
        /*
         * Limit ORD based on client limit, local device limit, and
@@ -1050,6 +952,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
        memset(&qp_attr, 0, sizeof qp_attr);
        qp_attr.event_handler = qp_event_handler;
        qp_attr.qp_context = &newxprt->sc_xprt;
+       qp_attr.port_num = newxprt->sc_cm_id->port_num;
+       qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests;
        qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
        qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
        qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
@@ -1248,8 +1152,8 @@ static void __svc_rdma_free(struct work_struct *work)
        }
 
        rdma_dealloc_frmr_q(rdma);
+       svc_rdma_destroy_rw_ctxts(rdma);
        svc_rdma_destroy_ctxts(rdma);
-       svc_rdma_destroy_maps(rdma);
 
        /* Destroy the QP if present (not a listener) */
        if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))