Merge branch 'timers-core-for-linus' of git://git.kernel.org/pub/scm/linux/kernel...
[linux-2.6-microblaze.git] / net / sunrpc / xprtrdma / transport.c
index bbd6155..2e192ba 100644 (file)
@@ -200,9 +200,9 @@ xprt_rdma_free_addresses(struct rpc_xprt *xprt)
 static void
 xprt_rdma_connect_worker(struct work_struct *work)
 {
-       struct rpcrdma_xprt *r_xprt =
-               container_of(work, struct rpcrdma_xprt, rdma_connect.work);
-       struct rpc_xprt *xprt = &r_xprt->xprt;
+       struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
+                                                  rx_connect_worker.work);
+       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        int rc = 0;
 
        xprt_clear_connected(xprt);
@@ -235,7 +235,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
 
        dprintk("RPC:       %s: called\n", __func__);
 
-       cancel_delayed_work_sync(&r_xprt->rdma_connect);
+       cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 
        xprt_clear_connected(xprt);
 
@@ -364,8 +364,7 @@ xprt_setup_rdma(struct xprt_create *args)
         * any inline data. Also specify any padding which will be provided
         * from a preregistered zero buffer.
         */
-       rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
-                               &new_xprt->rx_data);
+       rc = rpcrdma_buffer_create(new_xprt);
        if (rc)
                goto out3;
 
@@ -374,9 +373,8 @@ xprt_setup_rdma(struct xprt_create *args)
         * connection loss notification is async. We also catch connection loss
         * when reaping receives.
         */
-       INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
-       new_ep->rep_func = rpcrdma_conn_func;
-       new_ep->rep_xprt = xprt;
+       INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
+                         xprt_rdma_connect_worker);
 
        xprt_rdma_format_addresses(xprt);
        xprt->max_payload = rpcrdma_max_payload(new_xprt);
@@ -434,94 +432,101 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 
        if (r_xprt->rx_ep.rep_connected != 0) {
                /* Reconnect */
-               schedule_delayed_work(&r_xprt->rdma_connect,
-                       xprt->reestablish_timeout);
+               schedule_delayed_work(&r_xprt->rx_connect_worker,
+                                     xprt->reestablish_timeout);
                xprt->reestablish_timeout <<= 1;
                if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
                        xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
                else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
                        xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
        } else {
-               schedule_delayed_work(&r_xprt->rdma_connect, 0);
+               schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
                if (!RPC_IS_ASYNC(task))
-                       flush_delayed_work(&r_xprt->rdma_connect);
+                       flush_delayed_work(&r_xprt->rx_connect_worker);
        }
 }
 
 /*
  * The RDMA allocate/free functions need the task structure as a place
  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence. For this reason, the recv buffers are attached to send
- * buffers for portions of the RPC. Note that the RPC layer allocates
- * both send and receive buffers in the same call. We may register
- * the receive buffer portion when using reply chunks.
+ * sequence.
+ *
+ * The RPC layer allocates both send and receive buffers in the same call
+ * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
+ * We may register rq_rcv_buf when using reply chunks.
  */
 static void *
 xprt_rdma_allocate(struct rpc_task *task, size_t size)
 {
        struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
-       struct rpcrdma_req *req, *nreq;
+       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+       struct rpcrdma_regbuf *rb;
+       struct rpcrdma_req *req;
+       size_t min_size;
+       gfp_t flags;
 
-       req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+       req = rpcrdma_buffer_get(&r_xprt->rx_buf);
        if (req == NULL)
                return NULL;
 
-       if (size > req->rl_size) {
-               dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
-                       "prog %d vers %d proc %d\n",
-                       __func__, size, req->rl_size,
-                       task->tk_client->cl_prog, task->tk_client->cl_vers,
-                       task->tk_msg.rpc_proc->p_proc);
-               /*
-                * Outgoing length shortage. Our inline write max must have
-                * been configured to perform direct i/o.
-                *
-                * This is therefore a large metadata operation, and the
-                * allocate call was made on the maximum possible message,
-                * e.g. containing long filename(s) or symlink data. In
-                * fact, while these metadata operations *might* carry
-                * large outgoing payloads, they rarely *do*. However, we
-                * have to commit to the request here, so reallocate and
-                * register it now. The data path will never require this
-                * reallocation.
-                *
-                * If the allocation or registration fails, the RPC framework
-                * will (doggedly) retry.
-                */
-               if (task->tk_flags & RPC_TASK_SWAPPER)
-                       nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
-               else
-                       nreq = kmalloc(sizeof *req + size, GFP_NOFS);
-               if (nreq == NULL)
-                       goto outfail;
-
-               if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
-                               nreq->rl_base, size + sizeof(struct rpcrdma_req)
-                               - offsetof(struct rpcrdma_req, rl_base),
-                               &nreq->rl_handle, &nreq->rl_iov)) {
-                       kfree(nreq);
-                       goto outfail;
-               }
-               rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
-               nreq->rl_size = size;
-               nreq->rl_niovs = 0;
-               nreq->rl_nchunks = 0;
-               nreq->rl_buffer = (struct rpcrdma_buffer *)req;
-               nreq->rl_reply = req->rl_reply;
-               memcpy(nreq->rl_segments,
-                       req->rl_segments, sizeof nreq->rl_segments);
-               /* flag the swap with an unused field */
-               nreq->rl_iov.length = 0;
-               req->rl_reply = NULL;
-               req = nreq;
-       }
+       flags = GFP_NOIO | __GFP_NOWARN;
+       if (RPC_IS_SWAPPER(task))
+               flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
+
+       if (req->rl_rdmabuf == NULL)
+               goto out_rdmabuf;
+       if (req->rl_sendbuf == NULL)
+               goto out_sendbuf;
+       if (size > req->rl_sendbuf->rg_size)
+               goto out_sendbuf;
+
+out:
        dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
        req->rl_connect_cookie = 0;     /* our reserved value */
-       return req->rl_xdr_buf;
-
-outfail:
+       return req->rl_sendbuf->rg_base;
+
+out_rdmabuf:
+       min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+       rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+       if (IS_ERR(rb))
+               goto out_fail;
+       req->rl_rdmabuf = rb;
+
+out_sendbuf:
+       /* XDR encoding and RPC/RDMA marshaling of this request has not
+        * yet occurred. Thus a lower bound is needed to prevent buffer
+        * overrun during marshaling.
+        *
+        * RPC/RDMA marshaling may choose to send payload bearing ops
+        * inline, if the result is smaller than the inline threshold.
+        * The value of the "size" argument accounts for header
+        * requirements but not for the payload in these cases.
+        *
+        * Likewise, allocate enough space to receive a reply up to the
+        * size of the inline threshold.
+        *
+        * It's unlikely that both the send header and the received
+        * reply will be large, but slush is provided here to allow
+        * flexibility when marshaling.
+        */
+       min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
+       min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+       if (size < min_size)
+               size = min_size;
+
+       rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+       if (IS_ERR(rb))
+               goto out_fail;
+       rb->rg_owner = req;
+
+       r_xprt->rx_stats.hardway_register_count += size;
+       rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+       req->rl_sendbuf = rb;
+       goto out;
+
+out_fail:
        rpcrdma_buffer_put(req);
-       rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+       r_xprt->rx_stats.failed_marshal_count++;
        return NULL;
 }
 
@@ -533,47 +538,24 @@ xprt_rdma_free(void *buffer)
 {
        struct rpcrdma_req *req;
        struct rpcrdma_xprt *r_xprt;
-       struct rpcrdma_rep *rep;
+       struct rpcrdma_regbuf *rb;
        int i;
 
        if (buffer == NULL)
                return;
 
-       req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
-       if (req->rl_iov.length == 0) {  /* see allocate above */
-               r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
-                                     struct rpcrdma_xprt, rx_buf);
-       } else
-               r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
-       rep = req->rl_reply;
+       rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
+       req = rb->rg_owner;
+       r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
 
-       dprintk("RPC:       %s: called on 0x%p%s\n",
-               __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
+       dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-       /*
-        * Finish the deregistration.  The process is considered
-        * complete when the rr_func vector becomes NULL - this
-        * was put in place during rpcrdma_reply_handler() - the wait
-        * call below will not block if the dereg is "done". If
-        * interrupted, our framework will clean up.
-        */
        for (i = 0; req->rl_nchunks;) {
                --req->rl_nchunks;
                i += rpcrdma_deregister_external(
                        &req->rl_segments[i], r_xprt);
        }
 
-       if (req->rl_iov.length == 0) {  /* see allocate above */
-               struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
-               oreq->rl_reply = req->rl_reply;
-               (void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
-                                                  req->rl_handle,
-                                                  &req->rl_iov);
-               kfree(req);
-               req = oreq;
-       }
-
-       /* Put back request+reply buffers */
        rpcrdma_buffer_put(req);
 }