libceph, ceph: implement msgr2.1 protocol (crc and secure modes)
[linux-2.6-microblaze.git] / net / ceph / messenger.c
index af0f1fa..57d043b 100644 (file)
 #define CON_SOCK_STATE_CONNECTED       3       /* -> CLOSING or -> CLOSED */
 #define CON_SOCK_STATE_CLOSING         4       /* -> CLOSED */
 
-/*
- * connection states
- */
-#define CON_STATE_CLOSED        1  /* -> PREOPEN */
-#define CON_STATE_PREOPEN       2  /* -> CONNECTING, CLOSED */
-#define CON_STATE_CONNECTING    3  /* -> NEGOTIATING, CLOSED */
-#define CON_STATE_NEGOTIATING   4  /* -> OPEN, CLOSED */
-#define CON_STATE_OPEN          5  /* -> STANDBY, CLOSED */
-#define CON_STATE_STANDBY       6  /* -> PREOPEN, CLOSED */
-
-/*
- * ceph_connection flag bits
- */
-#define CON_FLAG_LOSSYTX           0  /* we can close channel or drop
-                                      * messages on errors */
-#define CON_FLAG_KEEPALIVE_PENDING 1  /* we need to send a keepalive */
-#define CON_FLAG_WRITE_PENDING    2  /* we have data ready to send */
-#define CON_FLAG_SOCK_CLOSED      3  /* socket state changed to closed */
-#define CON_FLAG_BACKOFF           4  /* need to retry queuing delayed work */
-
 static bool con_flag_valid(unsigned long con_flag)
 {
        switch (con_flag) {
-       case CON_FLAG_LOSSYTX:
-       case CON_FLAG_KEEPALIVE_PENDING:
-       case CON_FLAG_WRITE_PENDING:
-       case CON_FLAG_SOCK_CLOSED:
-       case CON_FLAG_BACKOFF:
+       case CEPH_CON_F_LOSSYTX:
+       case CEPH_CON_F_KEEPALIVE_PENDING:
+       case CEPH_CON_F_WRITE_PENDING:
+       case CEPH_CON_F_SOCK_CLOSED:
+       case CEPH_CON_F_BACKOFF:
                return true;
        default:
                return false;
        }
 }
 
-static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
+void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)
 {
        BUG_ON(!con_flag_valid(con_flag));
 
        clear_bit(con_flag, &con->flags);
 }
 
-static void con_flag_set(struct ceph_connection *con, unsigned long con_flag)
+void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag)
 {
        BUG_ON(!con_flag_valid(con_flag));
 
        set_bit(con_flag, &con->flags);
 }
 
-static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag)
+bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag)
 {
        BUG_ON(!con_flag_valid(con_flag));
 
        return test_bit(con_flag, &con->flags);
 }
 
-static bool con_flag_test_and_clear(struct ceph_connection *con,
-                                       unsigned long con_flag)
+bool ceph_con_flag_test_and_clear(struct ceph_connection *con,
+                                 unsigned long con_flag)
 {
        BUG_ON(!con_flag_valid(con_flag));
 
        return test_and_clear_bit(con_flag, &con->flags);
 }
 
-static bool con_flag_test_and_set(struct ceph_connection *con,
-                                       unsigned long con_flag)
+bool ceph_con_flag_test_and_set(struct ceph_connection *con,
+                               unsigned long con_flag)
 {
        BUG_ON(!con_flag_valid(con_flag));
 
@@ -157,12 +137,6 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
 
 static struct kmem_cache       *ceph_msg_cache;
 
-/* static tag bytes (protocol control messages) */
-static char tag_msg = CEPH_MSGR_TAG_MSG;
-static char tag_ack = CEPH_MSGR_TAG_ACK;
-static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
-static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
-
 #ifdef CONFIG_LOCKDEP
 static struct lock_class_key socket_class;
 #endif
@@ -184,7 +158,7 @@ static void con_fault(struct ceph_connection *con);
 static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN];
 static atomic_t addr_str_seq = ATOMIC_INIT(0);
 
-static struct page *zero_page;         /* used in certain error cases */
+struct page *ceph_zero_page;           /* used in certain error cases */
 
 const char *ceph_pr_addr(const struct ceph_entity_addr *addr)
 {
@@ -219,10 +193,13 @@ const char *ceph_pr_addr(const struct ceph_entity_addr *addr)
 }
 EXPORT_SYMBOL(ceph_pr_addr);
 
-static void encode_my_addr(struct ceph_messenger *msgr)
+void ceph_encode_my_addr(struct ceph_messenger *msgr)
 {
-       memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
-       ceph_encode_banner_addr(&msgr->my_enc_addr);
+       if (!ceph_msgr2(from_msgr(msgr))) {
+               memcpy(&msgr->my_enc_addr, &msgr->inst.addr,
+                      sizeof(msgr->my_enc_addr));
+               ceph_encode_banner_addr(&msgr->my_enc_addr);
+       }
 }
 
 /*
@@ -254,9 +231,9 @@ static void _ceph_msgr_exit(void)
                ceph_msgr_wq = NULL;
        }
 
-       BUG_ON(zero_page == NULL);
-       put_page(zero_page);
-       zero_page = NULL;
+       BUG_ON(!ceph_zero_page);
+       put_page(ceph_zero_page);
+       ceph_zero_page = NULL;
 
        ceph_msgr_slab_exit();
 }
@@ -266,9 +243,9 @@ int __init ceph_msgr_init(void)
        if (ceph_msgr_slab_init())
                return -ENOMEM;
 
-       BUG_ON(zero_page != NULL);
-       zero_page = ZERO_PAGE(0);
-       get_page(zero_page);
+       BUG_ON(ceph_zero_page);
+       ceph_zero_page = ZERO_PAGE(0);
+       get_page(ceph_zero_page);
 
        /*
         * The number of active work items is limited by the number of
@@ -372,7 +349,7 @@ static void ceph_sock_data_ready(struct sock *sk)
        }
 
        if (sk->sk_state != TCP_CLOSE_WAIT) {
-               dout("%s on %p state = %lu, queueing work\n", __func__,
+               dout("%s %p state = %d, queueing work\n", __func__,
                     con, con->state);
                queue_con(con);
        }
@@ -390,7 +367,7 @@ static void ceph_sock_write_space(struct sock *sk)
         * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
         * and net/core/stream.c:sk_stream_write_space().
         */
-       if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) {
+       if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) {
                if (sk_stream_is_writeable(sk)) {
                        dout("%s %p queueing write work\n", __func__, con);
                        clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
@@ -406,7 +383,7 @@ static void ceph_sock_state_change(struct sock *sk)
 {
        struct ceph_connection *con = sk->sk_user_data;
 
-       dout("%s %p state = %lu sk_state = %u\n", __func__,
+       dout("%s %p state = %d sk_state = %u\n", __func__,
             con, con->state, sk->sk_state);
 
        switch (sk->sk_state) {
@@ -416,7 +393,7 @@ static void ceph_sock_state_change(struct sock *sk)
        case TCP_CLOSE_WAIT:
                dout("%s TCP_CLOSE_WAIT\n", __func__);
                con_sock_state_closing(con);
-               con_flag_set(con, CON_FLAG_SOCK_CLOSED);
+               ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED);
                queue_con(con);
                break;
        case TCP_ESTABLISHED:
@@ -450,13 +427,15 @@ static void set_sock_callbacks(struct socket *sock,
 /*
  * initiate connection to a remote socket.
  */
-static int ceph_tcp_connect(struct ceph_connection *con)
+int ceph_tcp_connect(struct ceph_connection *con)
 {
        struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */
        struct socket *sock;
        unsigned int noio_flag;
        int ret;
 
+       dout("%s con %p peer_addr %s\n", __func__, con,
+            ceph_pr_addr(&con->peer_addr));
        BUG_ON(con->sock);
 
        /* sock_create_kern() allocates with GFP_KERNEL */
@@ -474,8 +453,6 @@ static int ceph_tcp_connect(struct ceph_connection *con)
 
        set_sock_callbacks(sock, con);
 
-       dout("connect %s\n", ceph_pr_addr(&con->peer_addr));
-
        con_sock_state_connecting(con);
        ret = sock->ops->connect(sock, (struct sockaddr *)&ss, sizeof(ss),
                                 O_NONBLOCK);
@@ -497,104 +474,14 @@ static int ceph_tcp_connect(struct ceph_connection *con)
        return 0;
 }
 
-/*
- * If @buf is NULL, discard up to @len bytes.
- */
-static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
-{
-       struct kvec iov = {buf, len};
-       struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
-       int r;
-
-       if (!buf)
-               msg.msg_flags |= MSG_TRUNC;
-
-       iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len);
-       r = sock_recvmsg(sock, &msg, msg.msg_flags);
-       if (r == -EAGAIN)
-               r = 0;
-       return r;
-}
-
-static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
-                    int page_offset, size_t length)
-{
-       struct bio_vec bvec = {
-               .bv_page = page,
-               .bv_offset = page_offset,
-               .bv_len = length
-       };
-       struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
-       int r;
-
-       BUG_ON(page_offset + length > PAGE_SIZE);
-       iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length);
-       r = sock_recvmsg(sock, &msg, msg.msg_flags);
-       if (r == -EAGAIN)
-               r = 0;
-       return r;
-}
-
-/*
- * write something.  @more is true if caller will be sending more data
- * shortly.
- */
-static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
-                           size_t kvlen, size_t len, bool more)
-{
-       struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
-       int r;
-
-       if (more)
-               msg.msg_flags |= MSG_MORE;
-       else
-               msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
-
-       r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
-       if (r == -EAGAIN)
-               r = 0;
-       return r;
-}
-
-/*
- * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
- */
-static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
-                            int offset, size_t size, int more)
-{
-       ssize_t (*sendpage)(struct socket *sock, struct page *page,
-                           int offset, size_t size, int flags);
-       int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
-       int ret;
-
-       /*
-        * sendpage cannot properly handle pages with page_count == 0,
-        * we need to fall back to sendmsg if that's the case.
-        *
-        * Same goes for slab pages: skb_can_coalesce() allows
-        * coalescing neighboring slab objects into a single frag which
-        * triggers one of hardened usercopy checks.
-        */
-       if (sendpage_ok(page))
-               sendpage = sock->ops->sendpage;
-       else
-               sendpage = sock_no_sendpage;
-
-       ret = sendpage(sock, page, offset, size, flags);
-       if (ret == -EAGAIN)
-               ret = 0;
-
-       return ret;
-}
-
 /*
  * Shutdown/close the socket for the given connection.
  */
-static int con_close_socket(struct ceph_connection *con)
+int ceph_con_close_socket(struct ceph_connection *con)
 {
        int rc = 0;
 
-       dout("con_close_socket on %p sock %p\n", con, con->sock);
+       dout("%s con %p sock %p\n", __func__, con, con->sock);
        if (con->sock) {
                rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
                sock_release(con->sock);
@@ -607,12 +494,34 @@ static int con_close_socket(struct ceph_connection *con)
         * received a socket close event before we had the chance to
         * shut the socket down.
         */
-       con_flag_clear(con, CON_FLAG_SOCK_CLOSED);
+       ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED);
 
        con_sock_state_closed(con);
        return rc;
 }
 
+static void ceph_con_reset_protocol(struct ceph_connection *con)
+{
+       dout("%s con %p\n", __func__, con);
+
+       ceph_con_close_socket(con);
+       if (con->in_msg) {
+               WARN_ON(con->in_msg->con != con);
+               ceph_msg_put(con->in_msg);
+               con->in_msg = NULL;
+       }
+       if (con->out_msg) {
+               WARN_ON(con->out_msg->con != con);
+               ceph_msg_put(con->out_msg);
+               con->out_msg = NULL;
+       }
+
+       if (ceph_msgr2(from_msgr(con->msgr)))
+               ceph_con_v2_reset_protocol(con);
+       else
+               ceph_con_v1_reset_protocol(con);
+}
+
 /*
  * Reset a connection.  Discard all incoming and outgoing messages
  * and clear *_seq state.
@@ -623,6 +532,7 @@ static void ceph_msg_remove(struct ceph_msg *msg)
 
        ceph_msg_put(msg);
 }
+
 static void ceph_msg_remove_list(struct list_head *head)
 {
        while (!list_empty(head)) {
@@ -632,31 +542,22 @@ static void ceph_msg_remove_list(struct list_head *head)
        }
 }
 
-static void reset_connection(struct ceph_connection *con)
+void ceph_con_reset_session(struct ceph_connection *con)
 {
-       /* reset connection, out_queue, msg_ and connect_seq */
-       /* discard existing out_queue and msg_seq */
-       dout("reset_connection %p\n", con);
+       dout("%s con %p\n", __func__, con);
+
+       WARN_ON(con->in_msg);
+       WARN_ON(con->out_msg);
        ceph_msg_remove_list(&con->out_queue);
        ceph_msg_remove_list(&con->out_sent);
-
-       if (con->in_msg) {
-               BUG_ON(con->in_msg->con != con);
-               ceph_msg_put(con->in_msg);
-               con->in_msg = NULL;
-       }
-
-       con->connect_seq = 0;
        con->out_seq = 0;
-       if (con->out_msg) {
-               BUG_ON(con->out_msg->con != con);
-               ceph_msg_put(con->out_msg);
-               con->out_msg = NULL;
-       }
        con->in_seq = 0;
        con->in_seq_acked = 0;
 
-       con->out_skip = 0;
+       if (ceph_msgr2(from_msgr(con->msgr)))
+               ceph_con_v2_reset_session(con);
+       else
+               ceph_con_v1_reset_session(con);
 }
 
 /*
@@ -666,17 +567,17 @@ void ceph_con_close(struct ceph_connection *con)
 {
        mutex_lock(&con->mutex);
        dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr));
-       con->state = CON_STATE_CLOSED;
+       con->state = CEPH_CON_S_CLOSED;
 
-       con_flag_clear(con, CON_FLAG_LOSSYTX);  /* so we retry next connect */
-       con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING);
-       con_flag_clear(con, CON_FLAG_WRITE_PENDING);
-       con_flag_clear(con, CON_FLAG_BACKOFF);
+       ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX);  /* so we retry next
+                                                         connect */
+       ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING);
+       ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
+       ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF);
 
-       reset_connection(con);
-       con->peer_global_seq = 0;
+       ceph_con_reset_protocol(con);
+       ceph_con_reset_session(con);
        cancel_con(con);
-       con_close_socket(con);
        mutex_unlock(&con->mutex);
 }
 EXPORT_SYMBOL(ceph_con_close);
@@ -691,8 +592,8 @@ void ceph_con_open(struct ceph_connection *con,
        mutex_lock(&con->mutex);
        dout("con_open %p %s\n", con, ceph_pr_addr(addr));
 
-       WARN_ON(con->state != CON_STATE_CLOSED);
-       con->state = CON_STATE_PREOPEN;
+       WARN_ON(con->state != CEPH_CON_S_CLOSED);
+       con->state = CEPH_CON_S_PREOPEN;
 
        con->peer_name.type = (__u8) entity_type;
        con->peer_name.num = cpu_to_le64(entity_num);
@@ -709,7 +610,10 @@ EXPORT_SYMBOL(ceph_con_open);
  */
 bool ceph_con_opened(struct ceph_connection *con)
 {
-       return con->connect_seq > 0;
+       if (ceph_msgr2(from_msgr(con->msgr)))
+               return ceph_con_v2_opened(con);
+
+       return ceph_con_v1_opened(con);
 }
 
 /*
@@ -732,16 +636,15 @@ void ceph_con_init(struct ceph_connection *con, void *private,
        INIT_LIST_HEAD(&con->out_sent);
        INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
 
-       con->state = CON_STATE_CLOSED;
+       con->state = CEPH_CON_S_CLOSED;
 }
 EXPORT_SYMBOL(ceph_con_init);
 
-
 /*
  * We maintain a global counter to order connection attempts.  Get
  * a unique seq greater than @gt.
  */
-static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
+u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt)
 {
        u32 ret;
 
@@ -753,48 +656,53 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
        return ret;
 }
 
-static void con_out_kvec_reset(struct ceph_connection *con)
-{
-       BUG_ON(con->out_skip);
-
-       con->out_kvec_left = 0;
-       con->out_kvec_bytes = 0;
-       con->out_kvec_cur = &con->out_kvec[0];
-}
-
-static void con_out_kvec_add(struct ceph_connection *con,
-                               size_t size, void *data)
+/*
+ * Discard messages that have been acked by the server.
+ */
+void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq)
 {
-       int index = con->out_kvec_left;
+       struct ceph_msg *msg;
+       u64 seq;
 
-       BUG_ON(con->out_skip);
-       BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
+       dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq);
+       while (!list_empty(&con->out_sent)) {
+               msg = list_first_entry(&con->out_sent, struct ceph_msg,
+                                      list_head);
+               WARN_ON(msg->needs_out_seq);
+               seq = le64_to_cpu(msg->hdr.seq);
+               if (seq > ack_seq)
+                       break;
 
-       con->out_kvec[index].iov_len = size;
-       con->out_kvec[index].iov_base = data;
-       con->out_kvec_left++;
-       con->out_kvec_bytes += size;
+               dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
+                    msg, seq);
+               ceph_msg_remove(msg);
+       }
 }
 
 /*
- * Chop off a kvec from the end.  Return residual number of bytes for
- * that kvec, i.e. how many bytes would have been written if the kvec
- * hadn't been nuked.
+ * Discard messages that have been requeued in con_fault(), up to
+ * reconnect_seq.  This avoids gratuitously resending messages that
+ * the server had received and handled prior to reconnect.
  */
-static int con_out_kvec_skip(struct ceph_connection *con)
+void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq)
 {
-       int off = con->out_kvec_cur - con->out_kvec;
-       int skip = 0;
+       struct ceph_msg *msg;
+       u64 seq;
 
-       if (con->out_kvec_bytes > 0) {
-               skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
-               BUG_ON(con->out_kvec_bytes < skip);
-               BUG_ON(!con->out_kvec_left);
-               con->out_kvec_bytes -= skip;
-               con->out_kvec_left--;
-       }
+       dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq);
+       while (!list_empty(&con->out_queue)) {
+               msg = list_first_entry(&con->out_queue, struct ceph_msg,
+                                      list_head);
+               if (msg->needs_out_seq)
+                       break;
+               seq = le64_to_cpu(msg->hdr.seq);
+               if (seq > reconnect_seq)
+                       break;
 
-       return skip;
+               dout("%s con %p discarding msg %p seq %llu\n", __func__, con,
+                    msg, seq);
+               ceph_msg_remove(msg);
+       }
 }
 
 #ifdef CONFIG_BLOCK
@@ -1113,10 +1021,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
        cursor->need_crc = true;
 }
 
-static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
+void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
+                              struct ceph_msg *msg, size_t length)
 {
-       struct ceph_msg_data_cursor *cursor = &msg->cursor;
-
        BUG_ON(!length);
        BUG_ON(length > msg->data_length);
        BUG_ON(!msg->num_data_items);
@@ -1132,9 +1039,9 @@ static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
  * data item, and supply the page offset and length of that piece.
  * Indicate whether this is the last piece in this data item.
  */
-static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
-                                       size_t *page_offset, size_t *length,
-                                       bool *last_piece)
+struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
+                               size_t *page_offset, size_t *length,
+                               bool *last_piece)
 {
        struct page *page;
 
@@ -1173,8 +1080,7 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
  * Returns true if the result moves the cursor on to the next piece
  * of the data item.
  */
-static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
-                                 size_t bytes)
+void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes)
 {
        bool new_piece;
 
@@ -1210,1303 +1116,234 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
        cursor->need_crc = new_piece;
 }
 
-static size_t sizeof_footer(struct ceph_connection *con)
+u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset,
+                    unsigned int length)
 {
-       return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
-           sizeof(struct ceph_msg_footer) :
-           sizeof(struct ceph_msg_footer_old);
+       char *kaddr;
+
+       kaddr = kmap(page);
+       BUG_ON(kaddr == NULL);
+       crc = crc32c(crc, kaddr + page_offset, length);
+       kunmap(page);
+
+       return crc;
 }
 
-static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
+bool ceph_addr_is_blank(const struct ceph_entity_addr *addr)
 {
-       /* Initialize data cursor */
+       struct sockaddr_storage ss = addr->in_addr; /* align */
+       struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr;
+       struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr;
 
-       ceph_msg_data_cursor_init(msg, (size_t)data_len);
+       switch (ss.ss_family) {
+       case AF_INET:
+               return addr4->s_addr == htonl(INADDR_ANY);
+       case AF_INET6:
+               return ipv6_addr_any(addr6);
+       default:
+               return true;
+       }
 }
 
-/*
- * Prepare footer for currently outgoing message, and finish things
- * off.  Assumes out_kvec* are already valid.. we just add on to the end.
- */
-static void prepare_write_message_footer(struct ceph_connection *con)
+int ceph_addr_port(const struct ceph_entity_addr *addr)
 {
-       struct ceph_msg *m = con->out_msg;
-
-       m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
+       switch (get_unaligned(&addr->in_addr.ss_family)) {
+       case AF_INET:
+               return ntohs(get_unaligned(&((struct sockaddr_in *)&addr->in_addr)->sin_port));
+       case AF_INET6:
+               return ntohs(get_unaligned(&((struct sockaddr_in6 *)&addr->in_addr)->sin6_port));
+       }
+       return 0;
+}
 
-       dout("prepare_write_message_footer %p\n", con);
-       con_out_kvec_add(con, sizeof_footer(con), &m->footer);
-       if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
-               if (con->ops->sign_message)
-                       con->ops->sign_message(m);
-               else
-                       m->footer.sig = 0;
-       } else {
-               m->old_footer.flags = m->footer.flags;
+void ceph_addr_set_port(struct ceph_entity_addr *addr, int p)
+{
+       switch (get_unaligned(&addr->in_addr.ss_family)) {
+       case AF_INET:
+               put_unaligned(htons(p), &((struct sockaddr_in *)&addr->in_addr)->sin_port);
+               break;
+       case AF_INET6:
+               put_unaligned(htons(p), &((struct sockaddr_in6 *)&addr->in_addr)->sin6_port);
+               break;
        }
-       con->out_more = m->more_to_follow;
-       con->out_msg_done = true;
 }
 
 /*
- * Prepare headers for the next outgoing message.
+ * Unlike other *_pton function semantics, zero indicates success.
  */
-static void prepare_write_message(struct ceph_connection *con)
+static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr,
+               char delim, const char **ipend)
 {
-       struct ceph_msg *m;
-       u32 crc;
-
-       con_out_kvec_reset(con);
-       con->out_msg_done = false;
-
-       /* Sneak an ack in there first?  If we can get it into the same
-        * TCP packet that's a good thing. */
-       if (con->in_seq > con->in_seq_acked) {
-               con->in_seq_acked = con->in_seq;
-               con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
-               con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-               con_out_kvec_add(con, sizeof (con->out_temp_ack),
-                       &con->out_temp_ack);
-       }
-
-       BUG_ON(list_empty(&con->out_queue));
-       m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
-       con->out_msg = m;
-       BUG_ON(m->con != con);
-
-       /* put message on sent list */
-       ceph_msg_get(m);
-       list_move_tail(&m->list_head, &con->out_sent);
-
-       /*
-        * only assign outgoing seq # if we haven't sent this message
-        * yet.  if it is requeued, resend with it's original seq.
-        */
-       if (m->needs_out_seq) {
-               m->hdr.seq = cpu_to_le64(++con->out_seq);
-               m->needs_out_seq = false;
+       memset(&addr->in_addr, 0, sizeof(addr->in_addr));
 
-               if (con->ops->reencode_message)
-                       con->ops->reencode_message(m);
+       if (in4_pton(str, len, (u8 *)&((struct sockaddr_in *)&addr->in_addr)->sin_addr.s_addr, delim, ipend)) {
+               put_unaligned(AF_INET, &addr->in_addr.ss_family);
+               return 0;
        }
 
-       dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
-            m, con->out_seq, le16_to_cpu(m->hdr.type),
-            le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
-            m->data_length);
-       WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
-       WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
-
-       /* tag + hdr + front + middle */
-       con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
-       con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
-       con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
-
-       if (m->middle)
-               con_out_kvec_add(con, m->middle->vec.iov_len,
-                       m->middle->vec.iov_base);
-
-       /* fill in hdr crc and finalize hdr */
-       crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
-       con->out_msg->hdr.crc = cpu_to_le32(crc);
-       memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
-
-       /* fill in front and middle crc, footer */
-       crc = crc32c(0, m->front.iov_base, m->front.iov_len);
-       con->out_msg->footer.front_crc = cpu_to_le32(crc);
-       if (m->middle) {
-               crc = crc32c(0, m->middle->vec.iov_base,
-                               m->middle->vec.iov_len);
-               con->out_msg->footer.middle_crc = cpu_to_le32(crc);
-       } else
-               con->out_msg->footer.middle_crc = 0;
-       dout("%s front_crc %u middle_crc %u\n", __func__,
-            le32_to_cpu(con->out_msg->footer.front_crc),
-            le32_to_cpu(con->out_msg->footer.middle_crc));
-       con->out_msg->footer.flags = 0;
-
-       /* is there a data payload? */
-       con->out_msg->footer.data_crc = 0;
-       if (m->data_length) {
-               prepare_message_data(con->out_msg, m->data_length);
-               con->out_more = 1;  /* data + footer will follow */
-       } else {
-               /* no, queue up footer too and be done */
-               prepare_write_message_footer(con);
+       if (in6_pton(str, len, (u8 *)&((struct sockaddr_in6 *)&addr->in_addr)->sin6_addr.s6_addr, delim, ipend)) {
+               put_unaligned(AF_INET6, &addr->in_addr.ss_family);
+               return 0;
        }
 
-       con_flag_set(con, CON_FLAG_WRITE_PENDING);
+       return -EINVAL;
 }
 
 /*
- * Prepare an ack.
+ * Extract hostname string and resolve using kernel DNS facility.
  */
-static void prepare_write_ack(struct ceph_connection *con)
+#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
+static int ceph_dns_resolve_name(const char *name, size_t namelen,
+               struct ceph_entity_addr *addr, char delim, const char **ipend)
 {
-       dout("prepare_write_ack %p %llu -> %llu\n", con,
-            con->in_seq_acked, con->in_seq);
-       con->in_seq_acked = con->in_seq;
+       const char *end, *delim_p;
+       char *colon_p, *ip_addr = NULL;
+       int ip_len, ret;
 
-       con_out_kvec_reset(con);
+       /*
+        * The end of the hostname occurs immediately preceding the delimiter or
+        * the port marker (':') where the delimiter takes precedence.
+        */
+       delim_p = memchr(name, delim, namelen);
+       colon_p = memchr(name, ':', namelen);
 
-       con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+       if (delim_p && colon_p)
+               end = delim_p < colon_p ? delim_p : colon_p;
+       else if (!delim_p && colon_p)
+               end = colon_p;
+       else {
+               end = delim_p;
+               if (!end) /* case: hostname:/ */
+                       end = name + namelen;
+       }
 
-       con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-       con_out_kvec_add(con, sizeof (con->out_temp_ack),
-                               &con->out_temp_ack);
+       if (end <= name)
+               return -EINVAL;
 
-       con->out_more = 1;  /* more will follow.. eventually.. */
-       con_flag_set(con, CON_FLAG_WRITE_PENDING);
-}
+       /* do dns_resolve upcall */
+       ip_len = dns_query(current->nsproxy->net_ns,
+                          NULL, name, end - name, NULL, &ip_addr, NULL, false);
+       if (ip_len > 0)
+               ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL);
+       else
+               ret = -ESRCH;
 
-/*
- * Prepare to share the seq during handshake
- */
-static void prepare_write_seq(struct ceph_connection *con)
-{
-       dout("prepare_write_seq %p %llu -> %llu\n", con,
-            con->in_seq_acked, con->in_seq);
-       con->in_seq_acked = con->in_seq;
+       kfree(ip_addr);
 
-       con_out_kvec_reset(con);
+       *ipend = end;
 
-       con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-       con_out_kvec_add(con, sizeof (con->out_temp_ack),
-                        &con->out_temp_ack);
+       pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
+                       ret, ret ? "failed" : ceph_pr_addr(addr));
 
-       con_flag_set(con, CON_FLAG_WRITE_PENDING);
+       return ret;
 }
-
-/*
- * Prepare to write keepalive byte.
- */
-static void prepare_write_keepalive(struct ceph_connection *con)
+#else
+static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
+               struct ceph_entity_addr *addr, char delim, const char **ipend)
 {
-       dout("prepare_write_keepalive %p\n", con);
-       con_out_kvec_reset(con);
-       if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
-               struct timespec64 now;
-
-               ktime_get_real_ts64(&now);
-               con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
-               ceph_encode_timespec64(&con->out_temp_keepalive2, &now);
-               con_out_kvec_add(con, sizeof(con->out_temp_keepalive2),
-                                &con->out_temp_keepalive2);
-       } else {
-               con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
-       }
-       con_flag_set(con, CON_FLAG_WRITE_PENDING);
+       return -EINVAL;
 }
+#endif
 
 /*
- * Connection negotiation.
+ * Parse a server name (IP or hostname). If a valid IP address is not found
+ * then try to extract a hostname to resolve using userspace DNS upcall.
  */
-
-static int get_connect_authorizer(struct ceph_connection *con)
+static int ceph_parse_server_name(const char *name, size_t namelen,
+               struct ceph_entity_addr *addr, char delim, const char **ipend)
 {
-       struct ceph_auth_handshake *auth;
-       int auth_proto;
-
-       if (!con->ops->get_authorizer) {
-               con->auth = NULL;
-               con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
-               con->out_connect.authorizer_len = 0;
-               return 0;
-       }
+       int ret;
 
-       auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry);
-       if (IS_ERR(auth))
-               return PTR_ERR(auth);
+       ret = ceph_pton(name, namelen, addr, delim, ipend);
+       if (ret)
+               ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend);
 
-       con->auth = auth;
-       con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
-       con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len);
-       return 0;
+       return ret;
 }
 
 /*
- * We connected to a peer and are saying hello.
+ * Parse an ip[:port] list into an addr array.  Use the default
+ * monitor port if a port isn't specified.
  */
-static void prepare_write_banner(struct ceph_connection *con)
-{
-       con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
-       con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
-                                       &con->msgr->my_enc_addr);
-
-       con->out_more = 0;
-       con_flag_set(con, CON_FLAG_WRITE_PENDING);
-}
-
-static void __prepare_write_connect(struct ceph_connection *con)
-{
-       con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect);
-       if (con->auth)
-               con_out_kvec_add(con, con->auth->authorizer_buf_len,
-                                con->auth->authorizer_buf);
-
-       con->out_more = 0;
-       con_flag_set(con, CON_FLAG_WRITE_PENDING);
-}
-
-static int prepare_write_connect(struct ceph_connection *con)
+int ceph_parse_ips(const char *c, const char *end,
+                  struct ceph_entity_addr *addr,
+                  int max_count, int *count)
 {
-       unsigned int global_seq = get_global_seq(con->msgr, 0);
-       int proto;
-       int ret;
+       int i, ret = -EINVAL;
+       const char *p = c;
 
-       switch (con->peer_name.type) {
-       case CEPH_ENTITY_TYPE_MON:
-               proto = CEPH_MONC_PROTOCOL;
-               break;
-       case CEPH_ENTITY_TYPE_OSD:
-               proto = CEPH_OSDC_PROTOCOL;
-               break;
-       case CEPH_ENTITY_TYPE_MDS:
-               proto = CEPH_MDSC_PROTOCOL;
-               break;
-       default:
-               BUG();
-       }
+       dout("parse_ips on '%.*s'\n", (int)(end-c), c);
+       for (i = 0; i < max_count; i++) {
+               const char *ipend;
+               int port;
+               char delim = ',';
 
-       dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
-            con->connect_seq, global_seq, proto);
+               if (*p == '[') {
+                       delim = ']';
+                       p++;
+               }
 
-       con->out_connect.features =
-           cpu_to_le64(from_msgr(con->msgr)->supported_features);
-       con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
-       con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
-       con->out_connect.global_seq = cpu_to_le32(global_seq);
-       con->out_connect.protocol_version = cpu_to_le32(proto);
-       con->out_connect.flags = 0;
+               ret = ceph_parse_server_name(p, end - p, &addr[i], delim, &ipend);
+               if (ret)
+                       goto bad;
+               ret = -EINVAL;
 
-       ret = get_connect_authorizer(con);
-       if (ret)
-               return ret;
+               p = ipend;
 
-       __prepare_write_connect(con);
-       return 0;
-}
-
-/*
- * write as much of pending kvecs to the socket as we can.
- *  1 -> done
- *  0 -> socket full, but more to do
- * <0 -> error
- */
-static int write_partial_kvec(struct ceph_connection *con)
-{
-       int ret;
-
-       dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
-       while (con->out_kvec_bytes > 0) {
-               ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
-                                      con->out_kvec_left, con->out_kvec_bytes,
-                                      con->out_more);
-               if (ret <= 0)
-                       goto out;
-               con->out_kvec_bytes -= ret;
-               if (con->out_kvec_bytes == 0)
-                       break;            /* done */
-
-               /* account for full iov entries consumed */
-               while (ret >= con->out_kvec_cur->iov_len) {
-                       BUG_ON(!con->out_kvec_left);
-                       ret -= con->out_kvec_cur->iov_len;
-                       con->out_kvec_cur++;
-                       con->out_kvec_left--;
-               }
-               /* and for a partially-consumed entry */
-               if (ret) {
-                       con->out_kvec_cur->iov_len -= ret;
-                       con->out_kvec_cur->iov_base += ret;
-               }
-       }
-       con->out_kvec_left = 0;
-       ret = 1;
-out:
-       dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
-            con->out_kvec_bytes, con->out_kvec_left, ret);
-       return ret;  /* done! */
-}
-
-static u32 ceph_crc32c_page(u32 crc, struct page *page,
-                               unsigned int page_offset,
-                               unsigned int length)
-{
-       char *kaddr;
-
-       kaddr = kmap(page);
-       BUG_ON(kaddr == NULL);
-       crc = crc32c(crc, kaddr + page_offset, length);
-       kunmap(page);
-
-       return crc;
-}
-/*
- * Write as much message data payload as we can.  If we finish, queue
- * up the footer.
- *  1 -> done, footer is now queued in out_kvec[].
- *  0 -> socket full, but more to do
- * <0 -> error
- */
-static int write_partial_message_data(struct ceph_connection *con)
-{
-       struct ceph_msg *msg = con->out_msg;
-       struct ceph_msg_data_cursor *cursor = &msg->cursor;
-       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-       int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
-       u32 crc;
-
-       dout("%s %p msg %p\n", __func__, con, msg);
-
-       if (!msg->num_data_items)
-               return -EINVAL;
-
-       /*
-        * Iterate through each page that contains data to be
-        * written, and send as much as possible for each.
-        *
-        * If we are calculating the data crc (the default), we will
-        * need to map the page.  If we have no pages, they have
-        * been revoked, so use the zero page.
-        */
-       crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
-       while (cursor->total_resid) {
-               struct page *page;
-               size_t page_offset;
-               size_t length;
-               int ret;
-
-               if (!cursor->resid) {
-                       ceph_msg_data_advance(cursor, 0);
-                       continue;
-               }
-
-               page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
-               if (length == cursor->total_resid)
-                       more = MSG_MORE;
-               ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
-                                       more);
-               if (ret <= 0) {
-                       if (do_datacrc)
-                               msg->footer.data_crc = cpu_to_le32(crc);
-
-                       return ret;
-               }
-               if (do_datacrc && cursor->need_crc)
-                       crc = ceph_crc32c_page(crc, page, page_offset, length);
-               ceph_msg_data_advance(cursor, (size_t)ret);
-       }
-
-       dout("%s %p msg %p done\n", __func__, con, msg);
-
-       /* prepare and queue up footer, too */
-       if (do_datacrc)
-               msg->footer.data_crc = cpu_to_le32(crc);
-       else
-               msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
-       con_out_kvec_reset(con);
-       prepare_write_message_footer(con);
-
-       return 1;       /* must return > 0 to indicate success */
-}
-
-/*
- * write some zeros
- */
-static int write_partial_skip(struct ceph_connection *con)
-{
-       int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
-       int ret;
-
-       dout("%s %p %d left\n", __func__, con, con->out_skip);
-       while (con->out_skip > 0) {
-               size_t size = min(con->out_skip, (int) PAGE_SIZE);
-
-               if (size == con->out_skip)
-                       more = MSG_MORE;
-               ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, more);
-               if (ret <= 0)
-                       goto out;
-               con->out_skip -= ret;
-       }
-       ret = 1;
-out:
-       return ret;
-}
-
-/*
- * Prepare to read connection handshake, or an ack.
- */
-static void prepare_read_banner(struct ceph_connection *con)
-{
-       dout("prepare_read_banner %p\n", con);
-       con->in_base_pos = 0;
-}
-
-static void prepare_read_connect(struct ceph_connection *con)
-{
-       dout("prepare_read_connect %p\n", con);
-       con->in_base_pos = 0;
-}
-
-static void prepare_read_ack(struct ceph_connection *con)
-{
-       dout("prepare_read_ack %p\n", con);
-       con->in_base_pos = 0;
-}
-
-static void prepare_read_seq(struct ceph_connection *con)
-{
-       dout("prepare_read_seq %p\n", con);
-       con->in_base_pos = 0;
-       con->in_tag = CEPH_MSGR_TAG_SEQ;
-}
-
-static void prepare_read_tag(struct ceph_connection *con)
-{
-       dout("prepare_read_tag %p\n", con);
-       con->in_base_pos = 0;
-       con->in_tag = CEPH_MSGR_TAG_READY;
-}
-
-static void prepare_read_keepalive_ack(struct ceph_connection *con)
-{
-       dout("prepare_read_keepalive_ack %p\n", con);
-       con->in_base_pos = 0;
-}
-
-/*
- * Prepare to read a message.
- */
-static int prepare_read_message(struct ceph_connection *con)
-{
-       dout("prepare_read_message %p\n", con);
-       BUG_ON(con->in_msg != NULL);
-       con->in_base_pos = 0;
-       con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
-       return 0;
-}
-
-
-static int read_partial(struct ceph_connection *con,
-                       int end, int size, void *object)
-{
-       while (con->in_base_pos < end) {
-               int left = end - con->in_base_pos;
-               int have = size - left;
-               int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
-               if (ret <= 0)
-                       return ret;
-               con->in_base_pos += ret;
-       }
-       return 1;
-}
-
-
-/*
- * Read all or part of the connect-side handshake on a new connection
- */
-static int read_partial_banner(struct ceph_connection *con)
-{
-       int size;
-       int end;
-       int ret;
-
-       dout("read_partial_banner %p at %d\n", con, con->in_base_pos);
-
-       /* peer's banner */
-       size = strlen(CEPH_BANNER);
-       end = size;
-       ret = read_partial(con, end, size, con->in_banner);
-       if (ret <= 0)
-               goto out;
-
-       size = sizeof (con->actual_peer_addr);
-       end += size;
-       ret = read_partial(con, end, size, &con->actual_peer_addr);
-       if (ret <= 0)
-               goto out;
-       ceph_decode_banner_addr(&con->actual_peer_addr);
-
-       size = sizeof (con->peer_addr_for_me);
-       end += size;
-       ret = read_partial(con, end, size, &con->peer_addr_for_me);
-       if (ret <= 0)
-               goto out;
-       ceph_decode_banner_addr(&con->peer_addr_for_me);
-
-out:
-       return ret;
-}
-
-static int read_partial_connect(struct ceph_connection *con)
-{
-       int size;
-       int end;
-       int ret;
-
-       dout("read_partial_connect %p at %d\n", con, con->in_base_pos);
-
-       size = sizeof (con->in_reply);
-       end = size;
-       ret = read_partial(con, end, size, &con->in_reply);
-       if (ret <= 0)
-               goto out;
-
-       if (con->auth) {
-               size = le32_to_cpu(con->in_reply.authorizer_len);
-               if (size > con->auth->authorizer_reply_buf_len) {
-                       pr_err("authorizer reply too big: %d > %zu\n", size,
-                              con->auth->authorizer_reply_buf_len);
-                       ret = -EINVAL;
-                       goto out;
-               }
-
-               end += size;
-               ret = read_partial(con, end, size,
-                                  con->auth->authorizer_reply_buf);
-               if (ret <= 0)
-                       goto out;
-       }
-
-       dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
-            con, (int)con->in_reply.tag,
-            le32_to_cpu(con->in_reply.connect_seq),
-            le32_to_cpu(con->in_reply.global_seq));
-out:
-       return ret;
-}
-
-/*
- * Verify the hello banner looks okay.
- */
-static int verify_hello(struct ceph_connection *con)
-{
-       if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
-               pr_err("connect to %s got bad banner\n",
-                      ceph_pr_addr(&con->peer_addr));
-               con->error_msg = "protocol error, bad banner";
-               return -1;
-       }
-       return 0;
-}
-
-static bool addr_is_blank(struct ceph_entity_addr *addr)
-{
-       struct sockaddr_storage ss = addr->in_addr; /* align */
-       struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr;
-       struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr;
-
-       switch (ss.ss_family) {
-       case AF_INET:
-               return addr4->s_addr == htonl(INADDR_ANY);
-       case AF_INET6:
-               return ipv6_addr_any(addr6);
-       default:
-               return true;
-       }
-}
-
-static int addr_port(struct ceph_entity_addr *addr)
-{
-       switch (get_unaligned(&addr->in_addr.ss_family)) {
-       case AF_INET:
-               return ntohs(get_unaligned(&((struct sockaddr_in *)&addr->in_addr)->sin_port));
-       case AF_INET6:
-               return ntohs(get_unaligned(&((struct sockaddr_in6 *)&addr->in_addr)->sin6_port));
-       }
-       return 0;
-}
-
-static void addr_set_port(struct ceph_entity_addr *addr, int p)
-{
-       switch (get_unaligned(&addr->in_addr.ss_family)) {
-       case AF_INET:
-               put_unaligned(htons(p), &((struct sockaddr_in *)&addr->in_addr)->sin_port);
-               break;
-       case AF_INET6:
-               put_unaligned(htons(p), &((struct sockaddr_in6 *)&addr->in_addr)->sin6_port);
-               break;
-       }
-}
-
-/*
- * Unlike other *_pton function semantics, zero indicates success.
- */
-static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr,
-               char delim, const char **ipend)
-{
-       memset(&addr->in_addr, 0, sizeof(addr->in_addr));
-
-       if (in4_pton(str, len, (u8 *)&((struct sockaddr_in *)&addr->in_addr)->sin_addr.s_addr, delim, ipend)) {
-               put_unaligned(AF_INET, &addr->in_addr.ss_family);
-               return 0;
-       }
-
-       if (in6_pton(str, len, (u8 *)&((struct sockaddr_in6 *)&addr->in_addr)->sin6_addr.s6_addr, delim, ipend)) {
-               put_unaligned(AF_INET6, &addr->in_addr.ss_family);
-               return 0;
-       }
-
-       return -EINVAL;
-}
-
-/*
- * Extract hostname string and resolve using kernel DNS facility.
- */
-#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
-static int ceph_dns_resolve_name(const char *name, size_t namelen,
-               struct ceph_entity_addr *addr, char delim, const char **ipend)
-{
-       const char *end, *delim_p;
-       char *colon_p, *ip_addr = NULL;
-       int ip_len, ret;
-
-       /*
-        * The end of the hostname occurs immediately preceding the delimiter or
-        * the port marker (':') where the delimiter takes precedence.
-        */
-       delim_p = memchr(name, delim, namelen);
-       colon_p = memchr(name, ':', namelen);
-
-       if (delim_p && colon_p)
-               end = delim_p < colon_p ? delim_p : colon_p;
-       else if (!delim_p && colon_p)
-               end = colon_p;
-       else {
-               end = delim_p;
-               if (!end) /* case: hostname:/ */
-                       end = name + namelen;
-       }
-
-       if (end <= name)
-               return -EINVAL;
-
-       /* do dns_resolve upcall */
-       ip_len = dns_query(current->nsproxy->net_ns,
-                          NULL, name, end - name, NULL, &ip_addr, NULL, false);
-       if (ip_len > 0)
-               ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL);
-       else
-               ret = -ESRCH;
-
-       kfree(ip_addr);
-
-       *ipend = end;
-
-       pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
-                       ret, ret ? "failed" : ceph_pr_addr(addr));
-
-       return ret;
-}
-#else
-static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
-               struct ceph_entity_addr *addr, char delim, const char **ipend)
-{
-       return -EINVAL;
-}
-#endif
-
-/*
- * Parse a server name (IP or hostname). If a valid IP address is not found
- * then try to extract a hostname to resolve using userspace DNS upcall.
- */
-static int ceph_parse_server_name(const char *name, size_t namelen,
-               struct ceph_entity_addr *addr, char delim, const char **ipend)
-{
-       int ret;
-
-       ret = ceph_pton(name, namelen, addr, delim, ipend);
-       if (ret)
-               ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend);
-
-       return ret;
-}
-
-/*
- * Parse an ip[:port] list into an addr array.  Use the default
- * monitor port if a port isn't specified.
- */
-int ceph_parse_ips(const char *c, const char *end,
-                  struct ceph_entity_addr *addr,
-                  int max_count, int *count)
-{
-       int i, ret = -EINVAL;
-       const char *p = c;
-
-       dout("parse_ips on '%.*s'\n", (int)(end-c), c);
-       for (i = 0; i < max_count; i++) {
-               const char *ipend;
-               int port;
-               char delim = ',';
-
-               if (*p == '[') {
-                       delim = ']';
-                       p++;
-               }
-
-               ret = ceph_parse_server_name(p, end - p, &addr[i], delim, &ipend);
-               if (ret)
-                       goto bad;
-               ret = -EINVAL;
-
-               p = ipend;
-
-               if (delim == ']') {
-                       if (*p != ']') {
-                               dout("missing matching ']'\n");
-                               goto bad;
-                       }
-                       p++;
-               }
-
-               /* port? */
-               if (p < end && *p == ':') {
-                       port = 0;
-                       p++;
-                       while (p < end && *p >= '0' && *p <= '9') {
-                               port = (port * 10) + (*p - '0');
-                               p++;
-                       }
-                       if (port == 0)
-                               port = CEPH_MON_PORT;
-                       else if (port > 65535)
-                               goto bad;
-               } else {
-                       port = CEPH_MON_PORT;
-               }
-
-               addr_set_port(&addr[i], port);
-               addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
-
-               dout("parse_ips got %s\n", ceph_pr_addr(&addr[i]));
-
-               if (p == end)
-                       break;
-               if (*p != ',')
-                       goto bad;
-               p++;
-       }
-
-       if (p != end)
-               goto bad;
-
-       if (count)
-               *count = i + 1;
-       return 0;
-
-bad:
-       return ret;
-}
-
-static int process_banner(struct ceph_connection *con)
-{
-       dout("process_banner on %p\n", con);
-
-       if (verify_hello(con) < 0)
-               return -1;
-
-       /*
-        * Make sure the other end is who we wanted.  note that the other
-        * end may not yet know their ip address, so if it's 0.0.0.0, give
-        * them the benefit of the doubt.
-        */
-       if (memcmp(&con->peer_addr, &con->actual_peer_addr,
-                  sizeof(con->peer_addr)) != 0 &&
-           !(addr_is_blank(&con->actual_peer_addr) &&
-             con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
-               pr_warn("wrong peer, want %s/%u, got %s/%u\n",
-                       ceph_pr_addr(&con->peer_addr),
-                       le32_to_cpu(con->peer_addr.nonce),
-                       ceph_pr_addr(&con->actual_peer_addr),
-                       le32_to_cpu(con->actual_peer_addr.nonce));
-               con->error_msg = "wrong peer at address";
-               return -1;
-       }
-
-       /*
-        * did we learn our address?
-        */
-       if (addr_is_blank(&con->msgr->inst.addr)) {
-               int port = addr_port(&con->msgr->inst.addr);
-
-               memcpy(&con->msgr->inst.addr.in_addr,
-                      &con->peer_addr_for_me.in_addr,
-                      sizeof(con->peer_addr_for_me.in_addr));
-               addr_set_port(&con->msgr->inst.addr, port);
-               encode_my_addr(con->msgr);
-               dout("process_banner learned my addr is %s\n",
-                    ceph_pr_addr(&con->msgr->inst.addr));
-       }
-
-       return 0;
-}
-
-static int process_connect(struct ceph_connection *con)
-{
-       u64 sup_feat = from_msgr(con->msgr)->supported_features;
-       u64 req_feat = from_msgr(con->msgr)->required_features;
-       u64 server_feat = le64_to_cpu(con->in_reply.features);
-       int ret;
-
-       dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
-
-       if (con->auth) {
-               int len = le32_to_cpu(con->in_reply.authorizer_len);
-
-               /*
-                * Any connection that defines ->get_authorizer()
-                * should also define ->add_authorizer_challenge() and
-                * ->verify_authorizer_reply().
-                *
-                * See get_connect_authorizer().
-                */
-               if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
-                       ret = con->ops->add_authorizer_challenge(
-                                   con, con->auth->authorizer_reply_buf, len);
-                       if (ret < 0)
-                               return ret;
-
-                       con_out_kvec_reset(con);
-                       __prepare_write_connect(con);
-                       prepare_read_connect(con);
-                       return 0;
-               }
-
-               if (len) {
-                       ret = con->ops->verify_authorizer_reply(con);
-                       if (ret < 0) {
-                               con->error_msg = "bad authorize reply";
-                               return ret;
-                       }
-               }
-       }
-
-       switch (con->in_reply.tag) {
-       case CEPH_MSGR_TAG_FEATURES:
-               pr_err("%s%lld %s feature set mismatch,"
-                      " my %llx < server's %llx, missing %llx\n",
-                      ENTITY_NAME(con->peer_name),
-                      ceph_pr_addr(&con->peer_addr),
-                      sup_feat, server_feat, server_feat & ~sup_feat);
-               con->error_msg = "missing required protocol features";
-               reset_connection(con);
-               return -1;
-
-       case CEPH_MSGR_TAG_BADPROTOVER:
-               pr_err("%s%lld %s protocol version mismatch,"
-                      " my %d != server's %d\n",
-                      ENTITY_NAME(con->peer_name),
-                      ceph_pr_addr(&con->peer_addr),
-                      le32_to_cpu(con->out_connect.protocol_version),
-                      le32_to_cpu(con->in_reply.protocol_version));
-               con->error_msg = "protocol version mismatch";
-               reset_connection(con);
-               return -1;
-
-       case CEPH_MSGR_TAG_BADAUTHORIZER:
-               con->auth_retry++;
-               dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
-                    con->auth_retry);
-               if (con->auth_retry == 2) {
-                       con->error_msg = "connect authorization failure";
-                       return -1;
-               }
-               con_out_kvec_reset(con);
-               ret = prepare_write_connect(con);
-               if (ret < 0)
-                       return ret;
-               prepare_read_connect(con);
-               break;
-
-       case CEPH_MSGR_TAG_RESETSESSION:
-               /*
-                * If we connected with a large connect_seq but the peer
-                * has no record of a session with us (no connection, or
-                * connect_seq == 0), they will send RESETSESION to indicate
-                * that they must have reset their session, and may have
-                * dropped messages.
-                */
-               dout("process_connect got RESET peer seq %u\n",
-                    le32_to_cpu(con->in_reply.connect_seq));
-               pr_err("%s%lld %s connection reset\n",
-                      ENTITY_NAME(con->peer_name),
-                      ceph_pr_addr(&con->peer_addr));
-               reset_connection(con);
-               con_out_kvec_reset(con);
-               ret = prepare_write_connect(con);
-               if (ret < 0)
-                       return ret;
-               prepare_read_connect(con);
-
-               /* Tell ceph about it. */
-               mutex_unlock(&con->mutex);
-               pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
-               if (con->ops->peer_reset)
-                       con->ops->peer_reset(con);
-               mutex_lock(&con->mutex);
-               if (con->state != CON_STATE_NEGOTIATING)
-                       return -EAGAIN;
-               break;
-
-       case CEPH_MSGR_TAG_RETRY_SESSION:
-               /*
-                * If we sent a smaller connect_seq than the peer has, try
-                * again with a larger value.
-                */
-               dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
-                    le32_to_cpu(con->out_connect.connect_seq),
-                    le32_to_cpu(con->in_reply.connect_seq));
-               con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
-               con_out_kvec_reset(con);
-               ret = prepare_write_connect(con);
-               if (ret < 0)
-                       return ret;
-               prepare_read_connect(con);
-               break;
-
-       case CEPH_MSGR_TAG_RETRY_GLOBAL:
-               /*
-                * If we sent a smaller global_seq than the peer has, try
-                * again with a larger value.
-                */
-               dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
-                    con->peer_global_seq,
-                    le32_to_cpu(con->in_reply.global_seq));
-               get_global_seq(con->msgr,
-                              le32_to_cpu(con->in_reply.global_seq));
-               con_out_kvec_reset(con);
-               ret = prepare_write_connect(con);
-               if (ret < 0)
-                       return ret;
-               prepare_read_connect(con);
-               break;
-
-       case CEPH_MSGR_TAG_SEQ:
-       case CEPH_MSGR_TAG_READY:
-               if (req_feat & ~server_feat) {
-                       pr_err("%s%lld %s protocol feature mismatch,"
-                              " my required %llx > server's %llx, need %llx\n",
-                              ENTITY_NAME(con->peer_name),
-                              ceph_pr_addr(&con->peer_addr),
-                              req_feat, server_feat, req_feat & ~server_feat);
-                       con->error_msg = "missing required protocol features";
-                       reset_connection(con);
-                       return -1;
+               if (delim == ']') {
+                       if (*p != ']') {
+                               dout("missing matching ']'\n");
+                               goto bad;
+                       }
+                       p++;
                }
 
-               WARN_ON(con->state != CON_STATE_NEGOTIATING);
-               con->state = CON_STATE_OPEN;
-               con->auth_retry = 0;    /* we authenticated; clear flag */
-               con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
-               con->connect_seq++;
-               con->peer_features = server_feat;
-               dout("process_connect got READY gseq %d cseq %d (%d)\n",
-                    con->peer_global_seq,
-                    le32_to_cpu(con->in_reply.connect_seq),
-                    con->connect_seq);
-               WARN_ON(con->connect_seq !=
-                       le32_to_cpu(con->in_reply.connect_seq));
-
-               if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
-                       con_flag_set(con, CON_FLAG_LOSSYTX);
-
-               con->delay = 0;      /* reset backoff memory */
-
-               if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
-                       prepare_write_seq(con);
-                       prepare_read_seq(con);
+               /* port? */
+               if (p < end && *p == ':') {
+                       port = 0;
+                       p++;
+                       while (p < end && *p >= '0' && *p <= '9') {
+                               port = (port * 10) + (*p - '0');
+                               p++;
+                       }
+                       if (port == 0)
+                               port = CEPH_MON_PORT;
+                       else if (port > 65535)
+                               goto bad;
                } else {
-                       prepare_read_tag(con);
+                       port = CEPH_MON_PORT;
                }
-               break;
 
-       case CEPH_MSGR_TAG_WAIT:
+               ceph_addr_set_port(&addr[i], port);
                /*
-                * If there is a connection race (we are opening
-                * connections to each other), one of us may just have
-                * to WAIT.  This shouldn't happen if we are the
-                * client.
+                * We want the type to be set according to ms_mode
+                * option, but options are normally parsed after mon
+                * addresses.  Rather than complicating parsing, set
+                * to LEGACY and override in build_initial_monmap()
+                * for mon addresses and ceph_messenger_init() for
+                * ip option.
                 */
-               con->error_msg = "protocol error, got WAIT as client";
-               return -1;
-
-       default:
-               con->error_msg = "protocol error, garbage tag during connect";
-               return -1;
-       }
-       return 0;
-}
-
-
-/*
- * read (part of) an ack
- */
-static int read_partial_ack(struct ceph_connection *con)
-{
-       int size = sizeof (con->in_temp_ack);
-       int end = size;
-
-       return read_partial(con, end, size, &con->in_temp_ack);
-}
+               addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
+               addr[i].nonce = 0;
 
-/*
- * We can finally discard anything that's been acked.
- */
-static void process_ack(struct ceph_connection *con)
-{
-       struct ceph_msg *m;
-       u64 ack = le64_to_cpu(con->in_temp_ack);
-       u64 seq;
-       bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ);
-       struct list_head *list = reconnect ? &con->out_queue : &con->out_sent;
+               dout("parse_ips got %s\n", ceph_pr_addr(&addr[i]));
 
-       /*
-        * In the reconnect case, con_fault() has requeued messages
-        * in out_sent. We should cleanup old messages according to
-        * the reconnect seq.
-        */
-       while (!list_empty(list)) {
-               m = list_first_entry(list, struct ceph_msg, list_head);
-               if (reconnect && m->needs_out_seq)
-                       break;
-               seq = le64_to_cpu(m->hdr.seq);
-               if (seq > ack)
+               if (p == end)
                        break;
-               dout("got ack for seq %llu type %d at %p\n", seq,
-                    le16_to_cpu(m->hdr.type), m);
-               m->ack_stamp = jiffies;
-               ceph_msg_remove(m);
-       }
-
-       prepare_read_tag(con);
-}
-
-
-static int read_partial_message_section(struct ceph_connection *con,
-                                       struct kvec *section,
-                                       unsigned int sec_len, u32 *crc)
-{
-       int ret, left;
-
-       BUG_ON(!section);
-
-       while (section->iov_len < sec_len) {
-               BUG_ON(section->iov_base == NULL);
-               left = sec_len - section->iov_len;
-               ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
-                                      section->iov_len, left);
-               if (ret <= 0)
-                       return ret;
-               section->iov_len += ret;
-       }
-       if (section->iov_len == sec_len)
-               *crc = crc32c(0, section->iov_base, section->iov_len);
-
-       return 1;
-}
-
-static int read_partial_msg_data(struct ceph_connection *con)
-{
-       struct ceph_msg *msg = con->in_msg;
-       struct ceph_msg_data_cursor *cursor = &msg->cursor;
-       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-       struct page *page;
-       size_t page_offset;
-       size_t length;
-       u32 crc = 0;
-       int ret;
-
-       if (!msg->num_data_items)
-               return -EIO;
-
-       if (do_datacrc)
-               crc = con->in_data_crc;
-       while (cursor->total_resid) {
-               if (!cursor->resid) {
-                       ceph_msg_data_advance(cursor, 0);
-                       continue;
-               }
-
-               page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
-               ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
-               if (ret <= 0) {
-                       if (do_datacrc)
-                               con->in_data_crc = crc;
-
-                       return ret;
-               }
-
-               if (do_datacrc)
-                       crc = ceph_crc32c_page(crc, page, page_offset, ret);
-               ceph_msg_data_advance(cursor, (size_t)ret);
-       }
-       if (do_datacrc)
-               con->in_data_crc = crc;
-
-       return 1;       /* must return > 0 to indicate success */
-}
-
-/*
- * read (part of) a message.
- */
-static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
-
-static int read_partial_message(struct ceph_connection *con)
-{
-       struct ceph_msg *m = con->in_msg;
-       int size;
-       int end;
-       int ret;
-       unsigned int front_len, middle_len, data_len;
-       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
-       bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
-       u64 seq;
-       u32 crc;
-
-       dout("read_partial_message con %p msg %p\n", con, m);
-
-       /* header */
-       size = sizeof (con->in_hdr);
-       end = size;
-       ret = read_partial(con, end, size, &con->in_hdr);
-       if (ret <= 0)
-               return ret;
-
-       crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc));
-       if (cpu_to_le32(crc) != con->in_hdr.crc) {
-               pr_err("read_partial_message bad hdr crc %u != expected %u\n",
-                      crc, con->in_hdr.crc);
-               return -EBADMSG;
-       }
-
-       front_len = le32_to_cpu(con->in_hdr.front_len);
-       if (front_len > CEPH_MSG_MAX_FRONT_LEN)
-               return -EIO;
-       middle_len = le32_to_cpu(con->in_hdr.middle_len);
-       if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
-               return -EIO;
-       data_len = le32_to_cpu(con->in_hdr.data_len);
-       if (data_len > CEPH_MSG_MAX_DATA_LEN)
-               return -EIO;
-
-       /* verify seq# */
-       seq = le64_to_cpu(con->in_hdr.seq);
-       if ((s64)seq - (s64)con->in_seq < 1) {
-               pr_info("skipping %s%lld %s seq %lld expected %lld\n",
-                       ENTITY_NAME(con->peer_name),
-                       ceph_pr_addr(&con->peer_addr),
-                       seq, con->in_seq + 1);
-               con->in_base_pos = -front_len - middle_len - data_len -
-                       sizeof_footer(con);
-               con->in_tag = CEPH_MSGR_TAG_READY;
-               return 1;
-       } else if ((s64)seq - (s64)con->in_seq > 1) {
-               pr_err("read_partial_message bad seq %lld expected %lld\n",
-                      seq, con->in_seq + 1);
-               con->error_msg = "bad message sequence # for incoming message";
-               return -EBADE;
-       }
-
-       /* allocate message? */
-       if (!con->in_msg) {
-               int skip = 0;
-
-               dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
-                    front_len, data_len);
-               ret = ceph_con_in_msg_alloc(con, &skip);
-               if (ret < 0)
-                       return ret;
-
-               BUG_ON(!con->in_msg ^ skip);
-               if (skip) {
-                       /* skip this message */
-                       dout("alloc_msg said skip message\n");
-                       con->in_base_pos = -front_len - middle_len - data_len -
-                               sizeof_footer(con);
-                       con->in_tag = CEPH_MSGR_TAG_READY;
-                       con->in_seq++;
-                       return 1;
-               }
-
-               BUG_ON(!con->in_msg);
-               BUG_ON(con->in_msg->con != con);
-               m = con->in_msg;
-               m->front.iov_len = 0;    /* haven't read it yet */
-               if (m->middle)
-                       m->middle->vec.iov_len = 0;
-
-               /* prepare for data payload, if any */
-
-               if (data_len)
-                       prepare_message_data(con->in_msg, data_len);
-       }
-
-       /* front */
-       ret = read_partial_message_section(con, &m->front, front_len,
-                                          &con->in_front_crc);
-       if (ret <= 0)
-               return ret;
-
-       /* middle */
-       if (m->middle) {
-               ret = read_partial_message_section(con, &m->middle->vec,
-                                                  middle_len,
-                                                  &con->in_middle_crc);
-               if (ret <= 0)
-                       return ret;
-       }
-
-       /* (page) data */
-       if (data_len) {
-               ret = read_partial_msg_data(con);
-               if (ret <= 0)
-                       return ret;
-       }
-
-       /* footer */
-       size = sizeof_footer(con);
-       end += size;
-       ret = read_partial(con, end, size, &m->footer);
-       if (ret <= 0)
-               return ret;
-
-       if (!need_sign) {
-               m->footer.flags = m->old_footer.flags;
-               m->footer.sig = 0;
+               if (*p != ',')
+                       goto bad;
+               p++;
        }
 
-       dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
-            m, front_len, m->footer.front_crc, middle_len,
-            m->footer.middle_crc, data_len, m->footer.data_crc);
-
-       /* crc ok? */
-       if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
-               pr_err("read_partial_message %p front crc %u != exp. %u\n",
-                      m, con->in_front_crc, m->footer.front_crc);
-               return -EBADMSG;
-       }
-       if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
-               pr_err("read_partial_message %p middle crc %u != exp %u\n",
-                      m, con->in_middle_crc, m->footer.middle_crc);
-               return -EBADMSG;
-       }
-       if (do_datacrc &&
-           (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
-           con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
-               pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
-                      con->in_data_crc, le32_to_cpu(m->footer.data_crc));
-               return -EBADMSG;
-       }
+       if (p != end)
+               goto bad;
 
-       if (need_sign && con->ops->check_message_signature &&
-           con->ops->check_message_signature(m)) {
-               pr_err("read_partial_message %p signature check failed\n", m);
-               return -EBADMSG;
-       }
+       if (count)
+               *count = i + 1;
+       return 0;
 
-       return 1; /* done! */
+bad:
+       return ret;
 }
 
 /*
@@ -2514,7 +1351,7 @@ static int read_partial_message(struct ceph_connection *con)
  * be careful not to do anything that waits on other incoming messages or it
  * may deadlock.
  */
-static void process_message(struct ceph_connection *con)
+void ceph_con_process_message(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->in_msg;
 
@@ -2528,12 +1365,13 @@ static void process_message(struct ceph_connection *con)
        con->in_seq++;
        mutex_unlock(&con->mutex);
 
-       dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
+       dout("===== %p %llu from %s%lld %d=%s len %d+%d+%d (%u %u %u) =====\n",
             msg, le64_to_cpu(msg->hdr.seq),
             ENTITY_NAME(msg->hdr.src),
             le16_to_cpu(msg->hdr.type),
             ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
             le32_to_cpu(msg->hdr.front_len),
+            le32_to_cpu(msg->hdr.middle_len),
             le32_to_cpu(msg->hdr.data_len),
             con->in_front_crc, con->in_middle_crc, con->in_data_crc);
        con->ops->dispatch(con, msg);
@@ -2541,264 +1379,6 @@ static void process_message(struct ceph_connection *con)
        mutex_lock(&con->mutex);
 }
 
-static int read_keepalive_ack(struct ceph_connection *con)
-{
-       struct ceph_timespec ceph_ts;
-       size_t size = sizeof(ceph_ts);
-       int ret = read_partial(con, size, size, &ceph_ts);
-       if (ret <= 0)
-               return ret;
-       ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
-       prepare_read_tag(con);
-       return 1;
-}
-
-/*
- * Write something to the socket.  Called in a worker thread when the
- * socket appears to be writeable and we have something ready to send.
- */
-static int try_write(struct ceph_connection *con)
-{
-       int ret = 1;
-
-       dout("try_write start %p state %lu\n", con, con->state);
-       if (con->state != CON_STATE_PREOPEN &&
-           con->state != CON_STATE_CONNECTING &&
-           con->state != CON_STATE_NEGOTIATING &&
-           con->state != CON_STATE_OPEN)
-               return 0;
-
-       /* open the socket first? */
-       if (con->state == CON_STATE_PREOPEN) {
-               BUG_ON(con->sock);
-               con->state = CON_STATE_CONNECTING;
-
-               con_out_kvec_reset(con);
-               prepare_write_banner(con);
-               prepare_read_banner(con);
-
-               BUG_ON(con->in_msg);
-               con->in_tag = CEPH_MSGR_TAG_READY;
-               dout("try_write initiating connect on %p new state %lu\n",
-                    con, con->state);
-               ret = ceph_tcp_connect(con);
-               if (ret < 0) {
-                       con->error_msg = "connect error";
-                       goto out;
-               }
-       }
-
-more:
-       dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
-       BUG_ON(!con->sock);
-
-       /* kvec data queued? */
-       if (con->out_kvec_left) {
-               ret = write_partial_kvec(con);
-               if (ret <= 0)
-                       goto out;
-       }
-       if (con->out_skip) {
-               ret = write_partial_skip(con);
-               if (ret <= 0)
-                       goto out;
-       }
-
-       /* msg pages? */
-       if (con->out_msg) {
-               if (con->out_msg_done) {
-                       ceph_msg_put(con->out_msg);
-                       con->out_msg = NULL;   /* we're done with this one */
-                       goto do_next;
-               }
-
-               ret = write_partial_message_data(con);
-               if (ret == 1)
-                       goto more;  /* we need to send the footer, too! */
-               if (ret == 0)
-                       goto out;
-               if (ret < 0) {
-                       dout("try_write write_partial_message_data err %d\n",
-                            ret);
-                       goto out;
-               }
-       }
-
-do_next:
-       if (con->state == CON_STATE_OPEN) {
-               if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
-                       prepare_write_keepalive(con);
-                       goto more;
-               }
-               /* is anything else pending? */
-               if (!list_empty(&con->out_queue)) {
-                       prepare_write_message(con);
-                       goto more;
-               }
-               if (con->in_seq > con->in_seq_acked) {
-                       prepare_write_ack(con);
-                       goto more;
-               }
-       }
-
-       /* Nothing to do! */
-       con_flag_clear(con, CON_FLAG_WRITE_PENDING);
-       dout("try_write nothing else to write.\n");
-       ret = 0;
-out:
-       dout("try_write done on %p ret %d\n", con, ret);
-       return ret;
-}
-
-/*
- * Read what we can from the socket.
- */
-static int try_read(struct ceph_connection *con)
-{
-       int ret = -1;
-
-more:
-       dout("try_read start on %p state %lu\n", con, con->state);
-       if (con->state != CON_STATE_CONNECTING &&
-           con->state != CON_STATE_NEGOTIATING &&
-           con->state != CON_STATE_OPEN)
-               return 0;
-
-       BUG_ON(!con->sock);
-
-       dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
-            con->in_base_pos);
-
-       if (con->state == CON_STATE_CONNECTING) {
-               dout("try_read connecting\n");
-               ret = read_partial_banner(con);
-               if (ret <= 0)
-                       goto out;
-               ret = process_banner(con);
-               if (ret < 0)
-                       goto out;
-
-               con->state = CON_STATE_NEGOTIATING;
-
-               /*
-                * Received banner is good, exchange connection info.
-                * Do not reset out_kvec, as sending our banner raced
-                * with receiving peer banner after connect completed.
-                */
-               ret = prepare_write_connect(con);
-               if (ret < 0)
-                       goto out;
-               prepare_read_connect(con);
-
-               /* Send connection info before awaiting response */
-               goto out;
-       }
-
-       if (con->state == CON_STATE_NEGOTIATING) {
-               dout("try_read negotiating\n");
-               ret = read_partial_connect(con);
-               if (ret <= 0)
-                       goto out;
-               ret = process_connect(con);
-               if (ret < 0)
-                       goto out;
-               goto more;
-       }
-
-       WARN_ON(con->state != CON_STATE_OPEN);
-
-       if (con->in_base_pos < 0) {
-               /*
-                * skipping + discarding content.
-                */
-               ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos);
-               if (ret <= 0)
-                       goto out;
-               dout("skipped %d / %d bytes\n", ret, -con->in_base_pos);
-               con->in_base_pos += ret;
-               if (con->in_base_pos)
-                       goto more;
-       }
-       if (con->in_tag == CEPH_MSGR_TAG_READY) {
-               /*
-                * what's next?
-                */
-               ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
-               if (ret <= 0)
-                       goto out;
-               dout("try_read got tag %d\n", (int)con->in_tag);
-               switch (con->in_tag) {
-               case CEPH_MSGR_TAG_MSG:
-                       prepare_read_message(con);
-                       break;
-               case CEPH_MSGR_TAG_ACK:
-                       prepare_read_ack(con);
-                       break;
-               case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
-                       prepare_read_keepalive_ack(con);
-                       break;
-               case CEPH_MSGR_TAG_CLOSE:
-                       con_close_socket(con);
-                       con->state = CON_STATE_CLOSED;
-                       goto out;
-               default:
-                       goto bad_tag;
-               }
-       }
-       if (con->in_tag == CEPH_MSGR_TAG_MSG) {
-               ret = read_partial_message(con);
-               if (ret <= 0) {
-                       switch (ret) {
-                       case -EBADMSG:
-                               con->error_msg = "bad crc/signature";
-                               fallthrough;
-                       case -EBADE:
-                               ret = -EIO;
-                               break;
-                       case -EIO:
-                               con->error_msg = "io error";
-                               break;
-                       }
-                       goto out;
-               }
-               if (con->in_tag == CEPH_MSGR_TAG_READY)
-                       goto more;
-               process_message(con);
-               if (con->state == CON_STATE_OPEN)
-                       prepare_read_tag(con);
-               goto more;
-       }
-       if (con->in_tag == CEPH_MSGR_TAG_ACK ||
-           con->in_tag == CEPH_MSGR_TAG_SEQ) {
-               /*
-                * the final handshake seq exchange is semantically
-                * equivalent to an ACK
-                */
-               ret = read_partial_ack(con);
-               if (ret <= 0)
-                       goto out;
-               process_ack(con);
-               goto more;
-       }
-       if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
-               ret = read_keepalive_ack(con);
-               if (ret <= 0)
-                       goto out;
-               goto more;
-       }
-
-out:
-       dout("try_read done on %p ret %d\n", con, ret);
-       return ret;
-
-bad_tag:
-       pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag);
-       con->error_msg = "protocol error, garbage tag";
-       ret = -1;
-       goto out;
-}
-
-
 /*
  * Atomically queue work on a connection after the specified delay.
  * Bump @con reference to avoid races with connection teardown.
@@ -2811,6 +1391,9 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
                return -ENOENT;
        }
 
+       if (delay >= HZ)
+               delay = round_jiffies_relative(delay);
+
        dout("%s %p %lu\n", __func__, con, delay);
        if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
                dout("%s %p - already queued\n", __func__, con);
@@ -2836,27 +1419,30 @@ static void cancel_con(struct ceph_connection *con)
 
 static bool con_sock_closed(struct ceph_connection *con)
 {
-       if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED))
+       if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED))
                return false;
 
 #define CASE(x)                                                                \
-       case CON_STATE_ ## x:                                           \
+       case CEPH_CON_S_ ## x:                                          \
                con->error_msg = "socket closed (con state " #x ")";    \
                break;
 
        switch (con->state) {
        CASE(CLOSED);
        CASE(PREOPEN);
-       CASE(CONNECTING);
-       CASE(NEGOTIATING);
+       CASE(V1_BANNER);
+       CASE(V1_CONNECT_MSG);
+       CASE(V2_BANNER_PREFIX);
+       CASE(V2_BANNER_PAYLOAD);
+       CASE(V2_HELLO);
+       CASE(V2_AUTH);
+       CASE(V2_AUTH_SIGNATURE);
+       CASE(V2_SESSION_CONNECT);
+       CASE(V2_SESSION_RECONNECT);
        CASE(OPEN);
        CASE(STANDBY);
        default:
-               pr_warn("%s con %p unrecognized state %lu\n",
-                       __func__, con, con->state);
-               con->error_msg = "unrecognized con state";
                BUG();
-               break;
        }
 #undef CASE
 
@@ -2867,15 +1453,15 @@ static bool con_backoff(struct ceph_connection *con)
 {
        int ret;
 
-       if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF))
+       if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF))
                return false;
 
-       ret = queue_con_delay(con, round_jiffies_relative(con->delay));
+       ret = queue_con_delay(con, con->delay);
        if (ret) {
                dout("%s: con %p FAILED to back off %lu\n", __func__,
                        con, con->delay);
                BUG_ON(ret == -ENOENT);
-               con_flag_set(con, CON_FLAG_BACKOFF);
+               ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
        }
 
        return true;
@@ -2891,11 +1477,11 @@ static void con_fault_finish(struct ceph_connection *con)
         * in case we faulted due to authentication, invalidate our
         * current tickets so that we can get new ones.
         */
-       if (con->auth_retry) {
-               dout("auth_retry %d, invalidating\n", con->auth_retry);
+       if (con->v1.auth_retry) {
+               dout("auth_retry %d, invalidating\n", con->v1.auth_retry);
                if (con->ops->invalidate_authorizer)
                        con->ops->invalidate_authorizer(con);
-               con->auth_retry = 0;
+               con->v1.auth_retry = 0;
        }
 
        if (con->ops->fault)
@@ -2923,21 +1509,24 @@ static void ceph_con_workfn(struct work_struct *work)
                        dout("%s: con %p BACKOFF\n", __func__, con);
                        break;
                }
-               if (con->state == CON_STATE_STANDBY) {
+               if (con->state == CEPH_CON_S_STANDBY) {
                        dout("%s: con %p STANDBY\n", __func__, con);
                        break;
                }
-               if (con->state == CON_STATE_CLOSED) {
+               if (con->state == CEPH_CON_S_CLOSED) {
                        dout("%s: con %p CLOSED\n", __func__, con);
                        BUG_ON(con->sock);
                        break;
                }
-               if (con->state == CON_STATE_PREOPEN) {
+               if (con->state == CEPH_CON_S_PREOPEN) {
                        dout("%s: con %p PREOPEN\n", __func__, con);
                        BUG_ON(con->sock);
                }
 
-               ret = try_read(con);
+               if (ceph_msgr2(from_msgr(con->msgr)))
+                       ret = ceph_con_v2_try_read(con);
+               else
+                       ret = ceph_con_v1_try_read(con);
                if (ret < 0) {
                        if (ret == -EAGAIN)
                                continue;
@@ -2947,7 +1536,10 @@ static void ceph_con_workfn(struct work_struct *work)
                        break;
                }
 
-               ret = try_write(con);
+               if (ceph_msgr2(from_msgr(con->msgr)))
+                       ret = ceph_con_v2_try_write(con);
+               else
+                       ret = ceph_con_v1_try_write(con);
                if (ret < 0) {
                        if (ret == -EAGAIN)
                                continue;
@@ -2974,64 +1566,54 @@ static void ceph_con_workfn(struct work_struct *work)
  */
 static void con_fault(struct ceph_connection *con)
 {
-       dout("fault %p state %lu to peer %s\n",
+       dout("fault %p state %d to peer %s\n",
             con, con->state, ceph_pr_addr(&con->peer_addr));
 
        pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
                ceph_pr_addr(&con->peer_addr), con->error_msg);
        con->error_msg = NULL;
 
-       WARN_ON(con->state != CON_STATE_CONNECTING &&
-              con->state != CON_STATE_NEGOTIATING &&
-              con->state != CON_STATE_OPEN);
+       WARN_ON(con->state == CEPH_CON_S_STANDBY ||
+               con->state == CEPH_CON_S_CLOSED);
 
-       con_close_socket(con);
+       ceph_con_reset_protocol(con);
 
-       if (con_flag_test(con, CON_FLAG_LOSSYTX)) {
+       if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
                dout("fault on LOSSYTX channel, marking CLOSED\n");
-               con->state = CON_STATE_CLOSED;
+               con->state = CEPH_CON_S_CLOSED;
                return;
        }
 
-       if (con->in_msg) {
-               BUG_ON(con->in_msg->con != con);
-               ceph_msg_put(con->in_msg);
-               con->in_msg = NULL;
-       }
-       if (con->out_msg) {
-               BUG_ON(con->out_msg->con != con);
-               ceph_msg_put(con->out_msg);
-               con->out_msg = NULL;
-       }
-
        /* Requeue anything that hasn't been acked */
        list_splice_init(&con->out_sent, &con->out_queue);
 
        /* If there are no messages queued or keepalive pending, place
         * the connection in a STANDBY state */
        if (list_empty(&con->out_queue) &&
-           !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {
+           !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) {
                dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
-               con_flag_clear(con, CON_FLAG_WRITE_PENDING);
-               con->state = CON_STATE_STANDBY;
+               ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
+               con->state = CEPH_CON_S_STANDBY;
        } else {
                /* retry after a delay. */
-               con->state = CON_STATE_PREOPEN;
-               if (con->delay == 0)
+               con->state = CEPH_CON_S_PREOPEN;
+               if (!con->delay) {
                        con->delay = BASE_DELAY_INTERVAL;
-               else if (con->delay < MAX_DELAY_INTERVAL)
+               } else if (con->delay < MAX_DELAY_INTERVAL) {
                        con->delay *= 2;
-               con_flag_set(con, CON_FLAG_BACKOFF);
+                       if (con->delay > MAX_DELAY_INTERVAL)
+                               con->delay = MAX_DELAY_INTERVAL;
+               }
+               ceph_con_flag_set(con, CEPH_CON_F_BACKOFF);
                queue_con(con);
        }
 }
 
-
 void ceph_messenger_reset_nonce(struct ceph_messenger *msgr)
 {
        u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000;
        msgr->inst.addr.nonce = cpu_to_le32(nonce);
-       encode_my_addr(msgr);
+       ceph_encode_my_addr(msgr);
 }
 
 /*
@@ -3042,26 +1624,35 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
 {
        spin_lock_init(&msgr->global_seq_lock);
 
-       if (myaddr)
-               msgr->inst.addr = *myaddr;
+       if (myaddr) {
+               memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr,
+                      sizeof(msgr->inst.addr.in_addr));
+               ceph_addr_set_port(&msgr->inst.addr, 0);
+       }
+
+       /*
+        * Since nautilus, clients are identified using type ANY.
+        * For msgr1, ceph_encode_banner_addr() munges it to NONE.
+        */
+       msgr->inst.addr.type = CEPH_ENTITY_ADDR_TYPE_ANY;
 
-       /* select a random nonce */
-       msgr->inst.addr.type = 0;
-       get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
-       encode_my_addr(msgr);
+       /* generate a random non-zero nonce */
+       do {
+               get_random_bytes(&msgr->inst.addr.nonce,
+                                sizeof(msgr->inst.addr.nonce));
+       } while (!msgr->inst.addr.nonce);
+       ceph_encode_my_addr(msgr);
 
        atomic_set(&msgr->stopping, 0);
        write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
 
        dout("%s %p\n", __func__, msgr);
 }
-EXPORT_SYMBOL(ceph_messenger_init);
 
 void ceph_messenger_fini(struct ceph_messenger *msgr)
 {
        put_net(read_pnet(&msgr->net));
 }
-EXPORT_SYMBOL(ceph_messenger_fini);
 
 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
 {
@@ -3075,17 +1666,19 @@ static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
 static void clear_standby(struct ceph_connection *con)
 {
        /* come back from STANDBY? */
-       if (con->state == CON_STATE_STANDBY) {
+       if (con->state == CEPH_CON_S_STANDBY) {
                dout("clear_standby %p and ++connect_seq\n", con);
-               con->state = CON_STATE_PREOPEN;
-               con->connect_seq++;
-               WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING));
-               WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING));
+               con->state = CEPH_CON_S_PREOPEN;
+               con->v1.connect_seq++;
+               WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING));
+               WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING));
        }
 }
 
 /*
  * Queue up an outgoing message on the given connection.
+ *
+ * Consumes a ref on @msg.
  */
 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 {
@@ -3096,7 +1689,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 
        mutex_lock(&con->mutex);
 
-       if (con->state == CON_STATE_CLOSED) {
+       if (con->state == CEPH_CON_S_CLOSED) {
                dout("con_send %p closed, dropping %p\n", con, msg);
                ceph_msg_put(msg);
                mutex_unlock(&con->mutex);
@@ -3119,7 +1712,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 
        /* if there wasn't anything waiting to send before, queue
         * new work */
-       if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
+       if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
                queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_send);
@@ -3137,36 +1730,30 @@ void ceph_msg_revoke(struct ceph_msg *msg)
        }
 
        mutex_lock(&con->mutex);
-       if (!list_empty(&msg->list_head)) {
-               dout("%s %p msg %p - was on queue\n", __func__, con, msg);
-               list_del_init(&msg->list_head);
-               msg->hdr.seq = 0;
-
-               ceph_msg_put(msg);
+       if (list_empty(&msg->list_head)) {
+               WARN_ON(con->out_msg == msg);
+               dout("%s con %p msg %p not linked\n", __func__, con, msg);
+               mutex_unlock(&con->mutex);
+               return;
        }
+
+       dout("%s con %p msg %p was linked\n", __func__, con, msg);
+       msg->hdr.seq = 0;
+       ceph_msg_remove(msg);
+
        if (con->out_msg == msg) {
-               BUG_ON(con->out_skip);
-               /* footer */
-               if (con->out_msg_done) {
-                       con->out_skip += con_out_kvec_skip(con);
-               } else {
-                       BUG_ON(!msg->data_length);
-                       con->out_skip += sizeof_footer(con);
-               }
-               /* data, middle, front */
-               if (msg->data_length)
-                       con->out_skip += msg->cursor.total_resid;
-               if (msg->middle)
-                       con->out_skip += con_out_kvec_skip(con);
-               con->out_skip += con_out_kvec_skip(con);
-
-               dout("%s %p msg %p - was sending, will write %d skip %d\n",
-                    __func__, con, msg, con->out_kvec_bytes, con->out_skip);
-               msg->hdr.seq = 0;
+               WARN_ON(con->state != CEPH_CON_S_OPEN);
+               dout("%s con %p msg %p was sending\n", __func__, con, msg);
+               if (ceph_msgr2(from_msgr(con->msgr)))
+                       ceph_con_v2_revoke(con);
+               else
+                       ceph_con_v1_revoke(con);
+               ceph_msg_put(con->out_msg);
                con->out_msg = NULL;
-               ceph_msg_put(msg);
+       } else {
+               dout("%s con %p msg %p not current, out_msg %p\n", __func__,
+                    con, msg, con->out_msg);
        }
-
        mutex_unlock(&con->mutex);
 }
 
@@ -3184,25 +1771,17 @@ void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 
        mutex_lock(&con->mutex);
        if (con->in_msg == msg) {
-               unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
-               unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
-               unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
-
-               /* skip rest of message */
-               dout("%s %p msg %p revoked\n", __func__, con, msg);
-               con->in_base_pos = con->in_base_pos -
-                               sizeof(struct ceph_msg_header) -
-                               front_len -
-                               middle_len -
-                               data_len -
-                               sizeof(struct ceph_msg_footer);
+               WARN_ON(con->state != CEPH_CON_S_OPEN);
+               dout("%s con %p msg %p was recving\n", __func__, con, msg);
+               if (ceph_msgr2(from_msgr(con->msgr)))
+                       ceph_con_v2_revoke_incoming(con);
+               else
+                       ceph_con_v1_revoke_incoming(con);
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               con->in_tag = CEPH_MSGR_TAG_READY;
-               con->in_seq++;
        } else {
-               dout("%s %p in_msg %p msg %p no-op\n",
-                    __func__, con, con->in_msg, msg);
+               dout("%s con %p msg %p not current, in_msg %p\n", __func__,
+                    con, msg, con->in_msg);
        }
        mutex_unlock(&con->mutex);
 }
@@ -3215,10 +1794,10 @@ void ceph_con_keepalive(struct ceph_connection *con)
        dout("con_keepalive %p\n", con);
        mutex_lock(&con->mutex);
        clear_standby(con);
-       con_flag_set(con, CON_FLAG_KEEPALIVE_PENDING);
+       ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING);
        mutex_unlock(&con->mutex);
 
-       if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0)
+       if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING))
                queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
@@ -3424,9 +2003,9 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
  * On error (ENOMEM, EAGAIN, ...),
  *  - con->in_msg == NULL
  */
-static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
+int ceph_con_in_msg_alloc(struct ceph_connection *con,
+                         struct ceph_msg_header *hdr, int *skip)
 {
-       struct ceph_msg_header *hdr = &con->in_hdr;
        int middle_len = le32_to_cpu(hdr->middle_len);
        struct ceph_msg *msg;
        int ret = 0;
@@ -3437,7 +2016,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
        mutex_unlock(&con->mutex);
        msg = con->ops->alloc_msg(con, hdr, skip);
        mutex_lock(&con->mutex);
-       if (con->state != CON_STATE_OPEN) {
+       if (con->state != CEPH_CON_S_OPEN) {
                if (msg)
                        ceph_msg_put(msg);
                return -EAGAIN;
@@ -3458,7 +2037,7 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
                con->error_msg = "error allocating memory for incoming message";
                return -ENOMEM;
        }
-       memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
+       memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr));
 
        if (middle_len && !con->in_msg->middle) {
                ret = ceph_alloc_middle(con, con->in_msg);
@@ -3471,6 +2050,39 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
        return ret;
 }
 
+void ceph_con_get_out_msg(struct ceph_connection *con)
+{
+       struct ceph_msg *msg;
+
+       BUG_ON(list_empty(&con->out_queue));
+       msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
+       WARN_ON(msg->con != con);
+
+       /*
+        * Put the message on "sent" list using a ref from ceph_con_send().
+        * It is put when the message is acked or revoked.
+        */
+       list_move_tail(&msg->list_head, &con->out_sent);
+
+       /*
+        * Only assign outgoing seq # if we haven't sent this message
+        * yet.  If it is requeued, resend with it's original seq.
+        */
+       if (msg->needs_out_seq) {
+               msg->hdr.seq = cpu_to_le64(++con->out_seq);
+               msg->needs_out_seq = false;
+
+               if (con->ops->reencode_message)
+                       con->ops->reencode_message(msg);
+       }
+
+       /*
+        * Get a ref for out_msg.  It is put when we are done sending the
+        * message or in case of a fault.
+        */
+       WARN_ON(con->out_msg);
+       con->out_msg = ceph_msg_get(msg);
+}
 
 /*
  * Free a generically kmalloc'd message.