libceph: handle discarding acked and requeued messages separately
authorIlya Dryomov <idryomov@gmail.com>
Tue, 13 Oct 2020 15:38:53 +0000 (17:38 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 14 Dec 2020 22:21:49 +0000 (23:21 +0100)
Make it easier to follow and remove dependency on msgr1 specific
CEPH_MSGR_TAG_SEQ.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
net/ceph/messenger.c

index 29b00b2..922fa15 100644 (file)
@@ -760,6 +760,56 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
        return ret;
 }
 
+/*
+ * Discard messages that have been acked by the server.
+ */
+static void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
+{
+       struct ceph_msg *msg;
+       u64 seq;
+
+       dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq);
+       while (!list_empty(&con->out_sent)) {
+               msg = list_first_entry(&con->out_sent, struct ceph_msg,
+                                      list_head);
+               WARN_ON(msg->needs_out_seq);
+               seq = le64_to_cpu(msg->hdr.seq);
+               if (seq > ack_seq)
+                       break;
+
+               dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
+                    msg, seq);
+               ceph_msg_remove(msg);
+       }
+}
+
+/*
+ * Discard messages that have been requeued in con_fault(), up to
+ * reconnect_seq.  This avoids gratuitously resending messages that
+ * the server had received and handled prior to reconnect.
+ */
+static void ceph_con_discard_requeued(struct ceph_connection *con,
+                                     u64 reconnect_seq)
+{
+       struct ceph_msg *msg;
+       u64 seq;
+
+       dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
+       while (!list_empty(&con->out_queue)) {
+               msg = list_first_entry(&con->out_queue, struct ceph_msg,
+                                      list_head);
+               if (msg->needs_out_seq)
+                       break;
+               seq = le64_to_cpu(msg->hdr.seq);
+               if (seq > reconnect_seq)
+                       break;
+
+               dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
+                    msg, seq);
+               ceph_msg_remove(msg);
+       }
+}
+
 static void con_out_kvec_reset(struct ceph_connection *con)
 {
        BUG_ON(con->out_skip);
@@ -2259,28 +2309,12 @@ static int read_partial_ack(struct ceph_connection *con)
  */
 static void process_ack(struct ceph_connection *con)
 {
-       struct ceph_msg *m;
        u64 ack = le64_to_cpu(con->in_temp_ack);
-       u64 seq;
-       bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ);
-       struct list_head *list = reconnect ? &con->out_queue : &con->out_sent;
 
-       /*
-        * In the reconnect case, con_fault() has requeued messages
-        * in out_sent. We should cleanup old messages according to
-        * the reconnect seq.
-        */
-       while (!list_empty(list)) {
-               m = list_first_entry(list, struct ceph_msg, list_head);
-               if (reconnect && m->needs_out_seq)
-                       break;
-               seq = le64_to_cpu(m->hdr.seq);
-               if (seq > ack)
-                       break;
-               dout("got ack for seq %llu type %d at %p\n", seq,
-                    le16_to_cpu(m->hdr.type), m);
-               ceph_msg_remove(m);
-       }
+       if (con->in_tag == CEPH_MSGR_TAG_ACK)
+               ceph_con_discard_sent(con, ack);
+       else
+               ceph_con_discard_requeued(con, ack);
 
        prepare_read_tag(con);
 }