libceph: factor out ceph_con_get_out_msg()
authorIlya Dryomov <idryomov@gmail.com>
Wed, 18 Nov 2020 15:37:14 +0000 (16:37 +0100)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 14 Dec 2020 22:21:49 +0000 (23:21 +0100)
Move the logic of grabbing the next message from the queue into its own
function.  Like ceph_con_in_msg_alloc(), this is protocol independent.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
net/ceph/messenger.c

index d161878..a912f2d 100644 (file)
@@ -1304,6 +1304,8 @@ static void prepare_write_message_footer(struct ceph_connection *con)
        con->out_msg_done = true;
 }
 
+static void ceph_con_get_out_msg(struct ceph_connection *con);
+
 /*
  * Prepare headers for the next outgoing message.
  */
@@ -1325,26 +1327,8 @@ static void prepare_write_message(struct ceph_connection *con)
                        &con->out_temp_ack);
        }
 
-       BUG_ON(list_empty(&con->out_queue));
-       m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
-       con->out_msg = m;
-       BUG_ON(m->con != con);
-
-       /* put message on sent list */
-       ceph_msg_get(m);
-       list_move_tail(&m->list_head, &con->out_sent);
-
-       /*
-        * only assign outgoing seq # if we haven't sent this message
-        * yet.  if it is requeued, resend with it's original seq.
-        */
-       if (m->needs_out_seq) {
-               m->hdr.seq = cpu_to_le64(++con->out_seq);
-               m->needs_out_seq = false;
-
-               if (con->ops->reencode_message)
-                       con->ops->reencode_message(m);
-       }
+       ceph_con_get_out_msg(con);
+       m = con->out_msg;
 
        dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
             m, con->out_seq, le16_to_cpu(m->hdr.type),
@@ -3118,6 +3102,8 @@ static void clear_standby(struct ceph_connection *con)
 
 /*
  * Queue up an outgoing message on the given connection.
+ *
+ * Consumes a ref on @msg.
  */
 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 {
@@ -3503,6 +3489,39 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con,
        return ret;
 }
 
+static void ceph_con_get_out_msg(struct ceph_connection *con)
+{
+       struct ceph_msg *msg;
+
+       BUG_ON(list_empty(&con->out_queue));
+       msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
+       WARN_ON(msg->con != con);
+
+       /*
+        * Put the message on "sent" list using a ref from ceph_con_send().
+        * It is put when the message is acked or revoked.
+        */
+       list_move_tail(&msg->list_head, &con->out_sent);
+
+       /*
+        * Only assign outgoing seq # if we haven't sent this message
+        * yet.  If it is requeued, resend with it's original seq.
+        */
+       if (msg->needs_out_seq) {
+               msg->hdr.seq = cpu_to_le64(++con->out_seq);
+               msg->needs_out_seq = false;
+
+               if (con->ops->reencode_message)
+                       con->ops->reencode_message(msg);
+       }
+
+       /*
+        * Get a ref for out_msg.  It is put when we are done sending the
+        * message or in case of a fault.
+        */
+       WARN_ON(con->out_msg);
+       con->out_msg = ceph_msg_get(msg);
+}
 
 /*
  * Free a generically kmalloc'd message.