ceph: split large reconnect into multiple messages
authorYan, Zheng <zyan@redhat.com>
Tue, 1 Jan 2019 08:28:33 +0000 (16:28 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Tue, 5 Mar 2019 17:55:16 +0000 (18:55 +0100)
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/caps.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/snap.c

index bba28a5..0eaf1b4 100644 (file)
@@ -2393,6 +2393,12 @@ void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
                if ((cap->issued & ci->i_flushing_caps) !=
                    ci->i_flushing_caps) {
                        ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+                       /* encode_caps_cb() also will reset these sequence
+                        * numbers. make sure sequence numbers in cap flush
+                        * message match later reconnect message */
+                       cap->seq = 0;
+                       cap->issue_seq = 0;
+                       cap->mseq = 0;
                        __kick_flushing_caps(mdsc, session, ci,
                                             oldest_flush_tid);
                } else {
index 04f1809..cce4e4b 100644 (file)
@@ -20,6 +20,8 @@
 #include <linux/ceph/auth.h>
 #include <linux/ceph/debugfs.h>
 
+#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
+
 /*
  * A cluster of MDS (metadata server) daemons is responsible for
  * managing the file system namespace (the directory hierarchy and
  */
 
 struct ceph_reconnect_state {
-       int nr_caps;
+       struct ceph_mds_session *session;
+       int nr_caps, nr_realms;
        struct ceph_pagelist *pagelist;
        unsigned msg_version;
+       bool allow_multi;
 };
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
@@ -2985,6 +2989,82 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
        mutex_unlock(&mdsc->mutex);
 }
 
+static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
+{
+       struct ceph_msg *reply;
+       struct ceph_pagelist *_pagelist;
+       struct page *page;
+       __le32 *addr;
+       int err = -ENOMEM;
+
+       if (!recon_state->allow_multi)
+               return -ENOSPC;
+
+       /* can't handle message that contains both caps and realm */
+       BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
+
+       /* pre-allocate new pagelist */
+       _pagelist = ceph_pagelist_alloc(GFP_NOFS);
+       if (!_pagelist)
+               return -ENOMEM;
+
+       reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
+       if (!reply)
+               goto fail_msg;
+
+       /* placeholder for nr_caps */
+       err = ceph_pagelist_encode_32(_pagelist, 0);
+       if (err < 0)
+               goto fail;
+
+       if (recon_state->nr_caps) {
+               /* currently encoding caps */
+               err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
+               if (err)
+                       goto fail;
+       } else {
+               /* placeholder for nr_realms (currently encoding relams) */
+               err = ceph_pagelist_encode_32(_pagelist, 0);
+               if (err < 0)
+                       goto fail;
+       }
+
+       err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
+       if (err)
+               goto fail;
+
+       page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
+       addr = kmap_atomic(page);
+       if (recon_state->nr_caps) {
+               /* currently encoding caps */
+               *addr = cpu_to_le32(recon_state->nr_caps);
+       } else {
+               /* currently encoding relams */
+               *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
+       }
+       kunmap_atomic(addr);
+
+       reply->hdr.version = cpu_to_le16(5);
+       reply->hdr.compat_version = cpu_to_le16(4);
+
+       reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
+       ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
+
+       ceph_con_send(&recon_state->session->s_con, reply);
+       ceph_pagelist_release(recon_state->pagelist);
+
+       recon_state->pagelist = _pagelist;
+       recon_state->nr_caps = 0;
+       recon_state->nr_realms = 0;
+       recon_state->msg_version = 5;
+       return 0;
+fail:
+       ceph_msg_put(reply);
+fail_msg:
+       ceph_pagelist_release(_pagelist);
+       return err;
+}
+
 /*
  * Encode information about a cap for a reconnect with the MDS.
  */
@@ -3004,9 +3084,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
             inode, ceph_vinop(inode), cap, cap->cap_id,
             ceph_cap_string(cap->issued));
-       err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
-       if (err)
-               return err;
 
        spin_lock(&ci->i_ceph_lock);
        cap->seq = 0;        /* reset cap seq */
@@ -3046,7 +3123,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        if (recon_state->msg_version >= 2) {
                int num_fcntl_locks, num_flock_locks;
                struct ceph_filelock *flocks = NULL;
-               size_t struct_len, total_len = 0;
+               size_t struct_len, total_len = sizeof(u64);
                u8 struct_v = 0;
 
 encode_again:
@@ -3081,7 +3158,7 @@ encode_again:
 
                if (recon_state->msg_version >= 3) {
                        /* version, compat_version and struct_len */
-                       total_len = 2 * sizeof(u8) + sizeof(u32);
+                       total_len += 2 * sizeof(u8) + sizeof(u32);
                        struct_v = 2;
                }
                /*
@@ -3098,12 +3175,19 @@ encode_again:
                        struct_len += sizeof(u64); /* snap_follows */
 
                total_len += struct_len;
-               err = ceph_pagelist_reserve(pagelist, total_len);
-               if (err) {
-                       kfree(flocks);
-                       goto out_err;
+
+               if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
+                       err = send_reconnect_partial(recon_state);
+                       if (err)
+                               goto out_freeflocks;
+                       pagelist = recon_state->pagelist;
                }
 
+               err = ceph_pagelist_reserve(pagelist, total_len);
+               if (err)
+                       goto out_freeflocks;
+
+               ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
                if (recon_state->msg_version >= 3) {
                        ceph_pagelist_encode_8(pagelist, struct_v);
                        ceph_pagelist_encode_8(pagelist, 1);
@@ -3115,7 +3199,7 @@ encode_again:
                                       num_fcntl_locks, num_flock_locks);
                if (struct_v >= 2)
                        ceph_pagelist_encode_64(pagelist, snap_follows);
-
+out_freeflocks:
                kfree(flocks);
        } else {
                u64 pathbase = 0;
@@ -3136,20 +3220,81 @@ encode_again:
                }
 
                err = ceph_pagelist_reserve(pagelist,
-                               pathlen + sizeof(u32) + sizeof(rec.v1));
+                                           sizeof(u64) + sizeof(u32) +
+                                           pathlen + sizeof(rec.v1));
                if (err) {
-                       kfree(path);
-                       goto out_err;
+                       goto out_freepath;
                }
 
+               ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
                ceph_pagelist_encode_string(pagelist, path, pathlen);
                ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
-
+out_freepath:
                kfree(path);
        }
 
-       recon_state->nr_caps++;
 out_err:
+       if (err >= 0)
+               recon_state->nr_caps++;
+       return err;
+}
+
+static int encode_snap_realms(struct ceph_mds_client *mdsc,
+                             struct ceph_reconnect_state *recon_state)
+{
+       struct rb_node *p;
+       struct ceph_pagelist *pagelist = recon_state->pagelist;
+       int err = 0;
+
+       if (recon_state->msg_version >= 4) {
+               err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
+               if (err < 0)
+                       goto fail;
+       }
+
+       /*
+        * snaprealms.  we provide mds with the ino, seq (version), and
+        * parent for all of our realms.  If the mds has any newer info,
+        * it will tell us.
+        */
+       for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
+               struct ceph_snap_realm *realm =
+                      rb_entry(p, struct ceph_snap_realm, node);
+               struct ceph_mds_snaprealm_reconnect sr_rec;
+
+               if (recon_state->msg_version >= 4) {
+                       size_t need = sizeof(u8) * 2 + sizeof(u32) +
+                                     sizeof(sr_rec);
+
+                       if (pagelist->length + need > RECONNECT_MAX_SIZE) {
+                               err = send_reconnect_partial(recon_state);
+                               if (err)
+                                       goto fail;
+                               pagelist = recon_state->pagelist;
+                       }
+
+                       err = ceph_pagelist_reserve(pagelist, need);
+                       if (err)
+                               goto fail;
+
+                       ceph_pagelist_encode_8(pagelist, 1);
+                       ceph_pagelist_encode_8(pagelist, 1);
+                       ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
+               }
+
+               dout(" adding snap realm %llx seq %lld parent %llx\n",
+                    realm->ino, realm->seq, realm->parent_ino);
+               sr_rec.ino = cpu_to_le64(realm->ino);
+               sr_rec.seq = cpu_to_le64(realm->seq);
+               sr_rec.parent = cpu_to_le64(realm->parent_ino);
+
+               err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
+               if (err)
+                       goto fail;
+
+               recon_state->nr_realms++;
+       }
+fail:
        return err;
 }
 
@@ -3170,18 +3315,17 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
                               struct ceph_mds_session *session)
 {
        struct ceph_msg *reply;
-       struct rb_node *p;
        int mds = session->s_mds;
        int err = -ENOMEM;
-       int s_nr_caps;
-       struct ceph_pagelist *pagelist;
-       struct ceph_reconnect_state recon_state;
+       struct ceph_reconnect_state recon_state = {
+               .session = session,
+       };
        LIST_HEAD(dispose);
 
        pr_info("mds%d reconnect start\n", mds);
 
-       pagelist = ceph_pagelist_alloc(GFP_NOFS);
-       if (!pagelist)
+       recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
+       if (!recon_state.pagelist)
                goto fail_nopagelist;
 
        reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
@@ -3225,63 +3369,90 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        /* replay unsafe requests */
        replay_unsafe_requests(mdsc, session);
 
+       ceph_early_kick_flushing_caps(mdsc, session);
+
        down_read(&mdsc->snap_rwsem);
 
-       /* traverse this session's caps */
-       s_nr_caps = session->s_nr_caps;
-       err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
+       /* placeholder for nr_caps */
+       err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
        if (err)
                goto fail;
 
-       recon_state.nr_caps = 0;
-       recon_state.pagelist = pagelist;
-       if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+       if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
                recon_state.msg_version = 3;
-       else
+               recon_state.allow_multi = true;
+       } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
+               recon_state.msg_version = 3;
+       } else {
                recon_state.msg_version = 2;
+       }
+       /* trsaverse this session's caps */
        err = iterate_session_caps(session, encode_caps_cb, &recon_state);
-       if (err < 0)
-               goto fail;
 
        spin_lock(&session->s_cap_lock);
        session->s_cap_reconnect = 0;
        spin_unlock(&session->s_cap_lock);
 
-       /*
-        * snaprealms.  we provide mds with the ino, seq (version), and
-        * parent for all of our realms.  If the mds has any newer info,
-        * it will tell us.
-        */
-       for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
-               struct ceph_snap_realm *realm =
-                       rb_entry(p, struct ceph_snap_realm, node);
-               struct ceph_mds_snaprealm_reconnect sr_rec;
+       if (err < 0)
+               goto fail;
 
-               dout(" adding snap realm %llx seq %lld parent %llx\n",
-                    realm->ino, realm->seq, realm->parent_ino);
-               sr_rec.ino = cpu_to_le64(realm->ino);
-               sr_rec.seq = cpu_to_le64(realm->seq);
-               sr_rec.parent = cpu_to_le64(realm->parent_ino);
-               err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
-               if (err)
-                       goto fail;
+       /* check if all realms can be encoded into current message */
+       if (mdsc->num_snap_realms) {
+               size_t total_len =
+                       recon_state.pagelist->length +
+                       mdsc->num_snap_realms *
+                       sizeof(struct ceph_mds_snaprealm_reconnect);
+               if (recon_state.msg_version >= 4) {
+                       /* number of realms */
+                       total_len += sizeof(u32);
+                       /* version, compat_version and struct_len */
+                       total_len += mdsc->num_snap_realms *
+                                    (2 * sizeof(u8) + sizeof(u32));
+               }
+               if (total_len > RECONNECT_MAX_SIZE) {
+                       if (!recon_state.allow_multi) {
+                               err = -ENOSPC;
+                               goto fail;
+                       }
+                       if (recon_state.nr_caps) {
+                               err = send_reconnect_partial(&recon_state);
+                               if (err)
+                                       goto fail;
+                       }
+                       recon_state.msg_version = 5;
+               }
        }
 
-       reply->hdr.version = cpu_to_le16(recon_state.msg_version);
+       err = encode_snap_realms(mdsc, &recon_state);
+       if (err < 0)
+               goto fail;
+
+       if (recon_state.msg_version >= 5) {
+               err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
+               if (err < 0)
+                       goto fail;
+       }
 
-       /* raced with cap release? */
-       if (s_nr_caps != recon_state.nr_caps) {
-               struct page *page = list_first_entry(&pagelist->head,
-                                                    struct page, lru);
+       if (recon_state.nr_caps || recon_state.nr_realms) {
+               struct page *page =
+                       list_first_entry(&recon_state.pagelist->head,
+                                       struct page, lru);
                __le32 *addr = kmap_atomic(page);
-               *addr = cpu_to_le32(recon_state.nr_caps);
+               if (recon_state.nr_caps) {
+                       WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
+                       *addr = cpu_to_le32(recon_state.nr_caps);
+               } else if (recon_state.msg_version >= 4) {
+                       *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
+               }
                kunmap_atomic(addr);
        }
 
-       reply->hdr.data_len = cpu_to_le32(pagelist->length);
-       ceph_msg_data_add_pagelist(reply, pagelist);
+       reply->hdr.version = cpu_to_le16(recon_state.msg_version);
+       if (recon_state.msg_version >= 4)
+               reply->hdr.compat_version = cpu_to_le16(4);
 
-       ceph_early_kick_flushing_caps(mdsc, session);
+       reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
+       ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
 
        ceph_con_send(&session->s_con, reply);
 
@@ -3292,7 +3463,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        mutex_unlock(&mdsc->mutex);
 
        up_read(&mdsc->snap_rwsem);
-       ceph_pagelist_release(pagelist);
+       ceph_pagelist_release(recon_state.pagelist);
        return;
 
 fail:
@@ -3300,7 +3471,7 @@ fail:
        up_read(&mdsc->snap_rwsem);
        mutex_unlock(&session->s_mutex);
 fail_nomsg:
-       ceph_pagelist_release(pagelist);
+       ceph_pagelist_release(recon_state.pagelist);
 fail_nopagelist:
        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
        return;
@@ -3698,6 +3869,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        init_rwsem(&mdsc->snap_rwsem);
        mdsc->snap_realms = RB_ROOT;
        INIT_LIST_HEAD(&mdsc->snap_empty);
+       mdsc->num_snap_realms = 0;
        spin_lock_init(&mdsc->snap_empty_lock);
        mdsc->last_tid = 0;
        mdsc->oldest_tid = 0;
index 0d3264c..4f96264 100644 (file)
 #define CEPHFS_FEATURE_REPLY_ENCODING  9
 #define CEPHFS_FEATURE_RECLAIM_CLIENT  10
 #define CEPHFS_FEATURE_LAZY_CAP_WANTED 11
+#define CEPHFS_FEATURE_MULTI_RECONNECT  12
 
 #define CEPHFS_FEATURES_CLIENT_SUPPORTED {     \
        0, 1, 2, 3, 4, 5, 6, 7,                 \
        CEPHFS_FEATURE_MIMIC,                   \
        CEPHFS_FEATURE_LAZY_CAP_WANTED,         \
+       CEPHFS_FEATURE_MULTI_RECONNECT,         \
 }
 #define CEPHFS_FEATURES_CLIENT_REQUIRED {}
 
@@ -342,6 +344,7 @@ struct ceph_mds_client {
        struct rw_semaphore     snap_rwsem;
        struct rb_root          snap_realms;
        struct list_head        snap_empty;
+       int                     num_snap_realms;
        spinlock_t              snap_empty_lock;  /* protect snap_empty */
 
        u64                    last_tid;      /* most recent mds request */
index f74193d..dfc25ce 100644 (file)
@@ -124,6 +124,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
        INIT_LIST_HEAD(&realm->inodes_with_caps);
        spin_lock_init(&realm->inodes_with_caps_lock);
        __insert_snap_realm(&mdsc->snap_realms, realm);
+       mdsc->num_snap_realms++;
+
        dout("create_snap_realm %llx %p\n", realm->ino, realm);
        return realm;
 }
@@ -175,6 +177,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
        dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
 
        rb_erase(&realm->node, &mdsc->snap_realms);
+       mdsc->num_snap_realms--;
 
        if (realm->parent) {
                list_del_init(&realm->child_item);