nfsd: document callback_wq serialization of callback code
[linux-2.6-microblaze.git] / fs / nfsd / nfs4callback.c
index 397eb78..24534db 100644 (file)
@@ -512,11 +512,9 @@ static int nfs4_xdr_dec_cb_recall(struct rpc_rqst *rqstp,
        if (unlikely(status))
                return status;
 
-       if (cb != NULL) {
-               status = decode_cb_sequence4res(xdr, cb);
-               if (unlikely(status || cb->cb_seq_status))
-                       return status;
-       }
+       status = decode_cb_sequence4res(xdr, cb);
+       if (unlikely(status || cb->cb_seq_status))
+               return status;
 
        return decode_cb_op_status(xdr, OP_CB_RECALL, &cb->cb_status);
 }
@@ -604,11 +602,10 @@ static int nfs4_xdr_dec_cb_layout(struct rpc_rqst *rqstp,
        if (unlikely(status))
                return status;
 
-       if (cb) {
-               status = decode_cb_sequence4res(xdr, cb);
-               if (unlikely(status || cb->cb_seq_status))
-                       return status;
-       }
+       status = decode_cb_sequence4res(xdr, cb);
+       if (unlikely(status || cb->cb_seq_status))
+               return status;
+
        return decode_cb_op_status(xdr, OP_CB_LAYOUTRECALL, &cb->cb_status);
 }
 #endif /* CONFIG_NFSD_PNFS */
@@ -663,11 +660,10 @@ static int nfs4_xdr_dec_cb_notify_lock(struct rpc_rqst *rqstp,
        if (unlikely(status))
                return status;
 
-       if (cb) {
-               status = decode_cb_sequence4res(xdr, cb);
-               if (unlikely(status || cb->cb_seq_status))
-                       return status;
-       }
+       status = decode_cb_sequence4res(xdr, cb);
+       if (unlikely(status || cb->cb_seq_status))
+               return status;
+
        return decode_cb_op_status(xdr, OP_CB_NOTIFY_LOCK, &cb->cb_status);
 }
 
@@ -759,11 +755,10 @@ static int nfs4_xdr_dec_cb_offload(struct rpc_rqst *rqstp,
        if (unlikely(status))
                return status;
 
-       if (cb) {
-               status = decode_cb_sequence4res(xdr, cb);
-               if (unlikely(status || cb->cb_seq_status))
-                       return status;
-       }
+       status = decode_cb_sequence4res(xdr, cb);
+       if (unlikely(status || cb->cb_seq_status))
+               return status;
+
        return decode_cb_op_status(xdr, OP_CB_OFFLOAD, &cb->cb_status);
 }
 /*
@@ -831,6 +826,31 @@ static int max_cb_time(struct net *net)
        return max(nn->nfsd4_lease/10, (time_t)1) * HZ;
 }
 
+static struct workqueue_struct *callback_wq;
+
+static bool nfsd4_queue_cb(struct nfsd4_callback *cb)
+{
+       return queue_work(callback_wq, &cb->cb_work);
+}
+
+static void nfsd41_cb_inflight_begin(struct nfs4_client *clp)
+{
+       atomic_inc(&clp->cl_cb_inflight);
+}
+
+static void nfsd41_cb_inflight_end(struct nfs4_client *clp)
+{
+
+       if (atomic_dec_and_test(&clp->cl_cb_inflight))
+               wake_up_var(&clp->cl_cb_inflight);
+}
+
+static void nfsd41_cb_inflight_wait_complete(struct nfs4_client *clp)
+{
+       wait_var_event(&clp->cl_cb_inflight,
+                       !atomic_read(&clp->cl_cb_inflight));
+}
+
 static const struct cred *get_backchannel_cred(struct nfs4_client *clp, struct rpc_clnt *client, struct nfsd4_session *ses)
 {
        if (clp->cl_minorversion == 0) {
@@ -942,14 +962,21 @@ static void nfsd4_cb_probe_done(struct rpc_task *task, void *calldata)
                clp->cl_cb_state = NFSD4_CB_UP;
 }
 
+static void nfsd4_cb_probe_release(void *calldata)
+{
+       struct nfs4_client *clp = container_of(calldata, struct nfs4_client, cl_cb_null);
+
+       nfsd41_cb_inflight_end(clp);
+
+}
+
 static const struct rpc_call_ops nfsd4_cb_probe_ops = {
        /* XXX: release method to ensure we set the cb channel down if
         * necessary on early failure? */
        .rpc_call_done = nfsd4_cb_probe_done,
+       .rpc_release = nfsd4_cb_probe_release,
 };
 
-static struct workqueue_struct *callback_wq;
-
 /*
  * Poke the callback thread to process any updates to the callback
  * parameters, and send a null probe.
@@ -980,9 +1007,12 @@ void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn)
  * If the slot is available, then mark it busy.  Otherwise, set the
  * thread for sleeping on the callback RPC wait queue.
  */
-static bool nfsd41_cb_get_slot(struct nfs4_client *clp, struct rpc_task *task)
+static bool nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
 {
-       if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
+       struct nfs4_client *clp = cb->cb_clp;
+
+       if (!cb->cb_holds_slot &&
+           test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
                rpc_sleep_on(&clp->cl_cb_waitq, task, NULL);
                /* Race breaker */
                if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
@@ -991,9 +1021,31 @@ static bool nfsd41_cb_get_slot(struct nfs4_client *clp, struct rpc_task *task)
                }
                rpc_wake_up_queued_task(&clp->cl_cb_waitq, task);
        }
+       cb->cb_holds_slot = true;
        return true;
 }
 
+static void nfsd41_cb_release_slot(struct nfsd4_callback *cb)
+{
+       struct nfs4_client *clp = cb->cb_clp;
+
+       if (cb->cb_holds_slot) {
+               cb->cb_holds_slot = false;
+               clear_bit(0, &clp->cl_cb_slot_busy);
+               rpc_wake_up_next(&clp->cl_cb_waitq);
+       }
+}
+
+static void nfsd41_destroy_cb(struct nfsd4_callback *cb)
+{
+       struct nfs4_client *clp = cb->cb_clp;
+
+       nfsd41_cb_release_slot(cb);
+       if (cb->cb_ops && cb->cb_ops->release)
+               cb->cb_ops->release(cb);
+       nfsd41_cb_inflight_end(clp);
+}
+
 /*
  * TODO: cb_sequence should support referring call lists, cachethis, multiple
  * slots, and mark callback channel down on communication errors.
@@ -1010,11 +1062,8 @@ static void nfsd4_cb_prepare(struct rpc_task *task, void *calldata)
         */
        cb->cb_seq_status = 1;
        cb->cb_status = 0;
-       if (minorversion) {
-               if (!cb->cb_holds_slot && !nfsd41_cb_get_slot(clp, task))
-                       return;
-               cb->cb_holds_slot = true;
-       }
+       if (minorversion && !nfsd41_cb_get_slot(cb, task))
+               return;
        rpc_call_start(task);
 }
 
@@ -1077,13 +1126,12 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
                }
                break;
        default:
+               nfsd4_mark_cb_fault(cb->cb_clp, cb->cb_seq_status);
                dprintk("%s: unprocessed error %d\n", __func__,
                        cb->cb_seq_status);
        }
 
-       cb->cb_holds_slot = false;
-       clear_bit(0, &clp->cl_cb_slot_busy);
-       rpc_wake_up_next(&clp->cl_cb_waitq);
+       nfsd41_cb_release_slot(cb);
        dprintk("%s: freed slot, new seqid=%d\n", __func__,
                clp->cl_cb_session->se_cb_seq_nr);
 
@@ -1096,8 +1144,10 @@ retry_nowait:
                ret = false;
        goto out;
 need_restart:
-       task->tk_status = 0;
-       cb->cb_need_restart = true;
+       if (!test_bit(NFSD4_CLIENT_CB_KILL, &clp->cl_flags)) {
+               task->tk_status = 0;
+               cb->cb_need_restart = true;
+       }
        return false;
 }
 
@@ -1139,9 +1189,9 @@ static void nfsd4_cb_release(void *calldata)
        struct nfsd4_callback *cb = calldata;
 
        if (cb->cb_need_restart)
-               nfsd4_run_cb(cb);
+               nfsd4_queue_cb(cb);
        else
-               cb->cb_ops->release(cb);
+               nfsd41_destroy_cb(cb);
 
 }
 
@@ -1175,6 +1225,7 @@ void nfsd4_shutdown_callback(struct nfs4_client *clp)
         */
        nfsd4_run_cb(&clp->cl_cb_null);
        flush_workqueue(callback_wq);
+       nfsd41_cb_inflight_wait_complete(clp);
 }
 
 /* requires cl_lock: */
@@ -1192,6 +1243,12 @@ static struct nfsd4_conn * __nfsd4_find_backchannel(struct nfs4_client *clp)
        return NULL;
 }
 
+/*
+ * Note there isn't a lot of locking in this code; instead we depend on
+ * the fact that it is run from the callback_wq, which won't run two
+ * work items at once.  So, for example, callback_wq handles all access
+ * of cl_cb_client and all calls to rpc_create or rpc_shutdown_client.
+ */
 static void nfsd4_process_cb_update(struct nfsd4_callback *cb)
 {
        struct nfs4_cb_conn conn;
@@ -1260,8 +1317,7 @@ nfsd4_run_cb_work(struct work_struct *work)
        clnt = clp->cl_cb_client;
        if (!clnt) {
                /* Callback channel broken, or client killed; give up: */
-               if (cb->cb_ops && cb->cb_ops->release)
-                       cb->cb_ops->release(cb);
+               nfsd41_destroy_cb(cb);
                return;
        }
 
@@ -1270,6 +1326,7 @@ nfsd4_run_cb_work(struct work_struct *work)
         */
        if (!cb->cb_ops && clp->cl_minorversion) {
                clp->cl_cb_state = NFSD4_CB_UP;
+               nfsd41_destroy_cb(cb);
                return;
        }
 
@@ -1295,5 +1352,9 @@ void nfsd4_init_cb(struct nfsd4_callback *cb, struct nfs4_client *clp,
 
 void nfsd4_run_cb(struct nfsd4_callback *cb)
 {
-       queue_work(callback_wq, &cb->cb_work);
+       struct nfs4_client *clp = cb->cb_clp;
+
+       nfsd41_cb_inflight_begin(clp);
+       if (!nfsd4_queue_cb(cb))
+               nfsd41_cb_inflight_end(clp);
 }