[PATCH] RPC: expose API for serializing access to RPC transports
[linux-2.6-microblaze.git] / net / sunrpc / xprt.c
index 2d1e8b8..e92ea99 100644 (file)
@@ -153,14 +153,42 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
        return retval;
 }
 
-
 static void __xprt_lock_write_next(struct rpc_xprt *xprt)
+{
+       struct rpc_task *task;
+       struct rpc_rqst *req;
+
+       if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
+               return;
+
+       task = rpc_wake_up_next(&xprt->resend);
+       if (!task) {
+               task = rpc_wake_up_next(&xprt->sending);
+               if (!task)
+                       goto out_unlock;
+       }
+
+       req = task->tk_rqstp;
+       xprt->snd_task = task;
+       if (req) {
+               req->rq_bytes_sent = 0;
+               req->rq_ntrans++;
+       }
+       return;
+
+out_unlock:
+       smp_mb__before_clear_bit();
+       clear_bit(XPRT_LOCKED, &xprt->state);
+       smp_mb__after_clear_bit();
+}
+
+static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 {
        struct rpc_task *task;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
                return;
-       if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
+       if (RPCXPRT_CONGESTED(xprt))
                goto out_unlock;
        task = rpc_wake_up_next(&xprt->resend);
        if (!task) {
@@ -168,7 +196,7 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
                if (!task)
                        goto out_unlock;
        }
-       if (xprt->nocong || __xprt_get_cong(xprt, task)) {
+       if (__xprt_get_cong(xprt, task)) {
                struct rpc_rqst *req = task->tk_rqstp;
                xprt->snd_task = task;
                if (req) {
@@ -183,11 +211,14 @@ out_unlock:
        smp_mb__after_clear_bit();
 }
 
-/*
- * Releases the transport for use by other requests.
+/**
+ * xprt_release_xprt - allow other requests to use a transport
+ * @xprt: transport with other tasks potentially waiting
+ * @task: task that is releasing access to the transport
+ *
+ * Note that "task" can be NULL.  No congestion control is provided.
  */
-static void
-__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
+void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        if (xprt->snd_task == task) {
                xprt->snd_task = NULL;
@@ -198,11 +229,29 @@ __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
        }
 }
 
-static inline void
-xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
+/**
+ * xprt_release_xprt_cong - allow other requests to use a transport
+ * @xprt: transport with other tasks potentially waiting
+ * @task: task that is releasing access to the transport
+ *
+ * Note that "task" can be NULL.  Another task is awoken to use the
+ * transport if the transport's congestion window allows it.
+ */
+void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+       if (xprt->snd_task == task) {
+               xprt->snd_task = NULL;
+               smp_mb__before_clear_bit();
+               clear_bit(XPRT_LOCKED, &xprt->state);
+               smp_mb__after_clear_bit();
+               __xprt_lock_write_next_cong(xprt);
+       }
+}
+
+static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        spin_lock_bh(&xprt->transport_lock);
-       __xprt_release_write(xprt, task);
+       xprt->ops->release_xprt(xprt, task);
        spin_unlock_bh(&xprt->transport_lock);
 }
 
@@ -237,7 +286,7 @@ __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
                return;
        req->rq_cong = 0;
        xprt->cong -= RPC_CWNDSCALE;
-       __xprt_lock_write_next(xprt);
+       __xprt_lock_write_next_cong(xprt);
 }
 
 /*
@@ -256,7 +305,7 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
                cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
                if (cwnd > RPC_MAXCWND(xprt))
                        cwnd = RPC_MAXCWND(xprt);
-               __xprt_lock_write_next(xprt);
+               __xprt_lock_write_next_cong(xprt);
        } else if (result == -ETIMEDOUT) {
                cwnd >>= 1;
                if (cwnd < RPC_CWNDSCALE)
@@ -693,7 +742,7 @@ void xprt_transmit(struct rpc_task *task)
                        task->tk_status = -ENOTCONN;
                else if (!req->rq_received)
                        rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
-               __xprt_release_write(xprt, task);
+               xprt->ops->release_xprt(xprt, task);
                spin_unlock_bh(&xprt->transport_lock);
                return;
        }
@@ -792,7 +841,7 @@ void xprt_release(struct rpc_task *task)
        if (!(req = task->tk_rqstp))
                return;
        spin_lock_bh(&xprt->transport_lock);
-       __xprt_release_write(xprt, task);
+       xprt->ops->release_xprt(xprt, task);
        __xprt_put_cong(xprt, req);
        if (!list_empty(&req->rq_list))
                list_del(&req->rq_list);