Merge tag 'for-linus' of git://github.com/openrisc/linux
[linux-2.6-microblaze.git] / fs / dlm / midcomms.c
index e3de268..7ae39ec 100644 (file)
  * compatibility. There exists better ways to make a better handling.
  * However this should be changed in the next major version bump of dlm.
  *
- * Ack handling:
- *
- * Currently we send an ack message for every dlm message. However we
- * can ack multiple dlm messages with one ack by just delaying the ack
- * message. Will reduce some traffic but makes the drop detection slower.
- *
  * Tail Size checking:
  *
  * There exists a message tail payload in e.g. DLM_MSG however we don't
@@ -169,6 +163,7 @@ struct midcomms_node {
 #define DLM_NODE_FLAG_CLOSE    1
 #define DLM_NODE_FLAG_STOP_TX  2
 #define DLM_NODE_FLAG_STOP_RX  3
+#define DLM_NODE_ULP_DELIVERED 4
        unsigned long flags;
        wait_queue_head_t shutdown_wait;
 
@@ -480,11 +475,12 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 {
        if (seq == node->seq_next) {
                node->seq_next++;
-               /* send ack before fin */
-               dlm_send_ack(node->nodeid, node->seq_next);
 
                switch (p->header.h_cmd) {
                case DLM_FIN:
+                       /* send ack before fin */
+                       dlm_send_ack(node->nodeid, node->seq_next);
+
                        spin_lock(&node->state_lock);
                        pr_debug("receive fin msg from node %d with state %s\n",
                                 node->nodeid, dlm_state_str(node->state));
@@ -534,6 +530,7 @@ static void dlm_midcomms_receive_buffer(union dlm_packet *p,
                default:
                        WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
                        dlm_receive_buffer(p, node->nodeid);
+                       set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
                        break;
                }
        } else {
@@ -933,6 +930,49 @@ int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
        return ret;
 }
 
+void dlm_midcomms_receive_done(int nodeid)
+{
+       struct midcomms_node *node;
+       int idx;
+
+       idx = srcu_read_lock(&nodes_srcu);
+       node = nodeid2node(nodeid, 0);
+       if (!node) {
+               srcu_read_unlock(&nodes_srcu, idx);
+               return;
+       }
+
+       /* old protocol, we do nothing */
+       switch (node->version) {
+       case DLM_VERSION_3_2:
+               break;
+       default:
+               srcu_read_unlock(&nodes_srcu, idx);
+               return;
+       }
+
+       /* do nothing if we didn't delivered stateful to ulp */
+       if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
+                               &node->flags)) {
+               srcu_read_unlock(&nodes_srcu, idx);
+               return;
+       }
+
+       spin_lock(&node->state_lock);
+       /* we only ack if state is ESTABLISHED */
+       switch (node->state) {
+       case DLM_ESTABLISHED:
+               spin_unlock(&node->state_lock);
+               dlm_send_ack(node->nodeid, node->seq_next);
+               break;
+       default:
+               spin_unlock(&node->state_lock);
+               /* do nothing FIN has it's own ack send */
+               break;
+       };
+       srcu_read_unlock(&nodes_srcu, idx);
+}
+
 void dlm_midcomms_unack_msg_resend(int nodeid)
 {
        struct midcomms_node *node;