ceph: support versioned reply
authorYan, Zheng <zyan@redhat.com>
Wed, 9 Jan 2019 02:10:17 +0000 (10:10 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Tue, 5 Mar 2019 17:55:16 +0000 (18:55 +0100)
In versioned reply, inodestat, dirstat and lease are encoded with
version, compat_version and struct_len.

Based on a patch from Jos Collin <jcollin@redhat.com>.

Link: http://tracker.ceph.com/issues/26936
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/mds_client.c
fs/ceph/mds_client.h

index f2f5777..e87544e 100644 (file)
@@ -65,6 +65,29 @@ static const struct ceph_connection_operations mds_con_ops;
  * mds reply parsing
  */
 
+static int parse_reply_info_quota(void **p, void *end,
+                                 struct ceph_mds_reply_info_in *info)
+{
+       u8 struct_v, struct_compat;
+       u32 struct_len;
+
+       ceph_decode_8_safe(p, end, struct_v, bad);
+       ceph_decode_8_safe(p, end, struct_compat, bad);
+       /* struct_v is expected to be >= 1. we only
+        * understand encoding with struct_compat == 1. */
+       if (!struct_v || struct_compat != 1)
+               goto bad;
+       ceph_decode_32_safe(p, end, struct_len, bad);
+       ceph_decode_need(p, end, struct_len, bad);
+       end = *p + struct_len;
+       ceph_decode_64_safe(p, end, info->max_bytes, bad);
+       ceph_decode_64_safe(p, end, info->max_files, bad);
+       *p = end;
+       return 0;
+bad:
+       return -EIO;
+}
+
 /*
  * parse individual inode info
  */
@@ -72,8 +95,24 @@ static int parse_reply_info_in(void **p, void *end,
                               struct ceph_mds_reply_info_in *info,
                               u64 features)
 {
-       int err = -EIO;
+       int err = 0;
+       u8 struct_v = 0;
 
+       if (features == (u64)-1) {
+               u32 struct_len;
+               u8 struct_compat;
+               ceph_decode_8_safe(p, end, struct_v, bad);
+               ceph_decode_8_safe(p, end, struct_compat, bad);
+               /* struct_v is expected to be >= 1. we only understand
+                * encoding with struct_compat == 1. */
+               if (!struct_v || struct_compat != 1)
+                       goto bad;
+               ceph_decode_32_safe(p, end, struct_len, bad);
+               ceph_decode_need(p, end, struct_len, bad);
+               end = *p + struct_len;
+       }
+
+       ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
        info->in = *p;
        *p += sizeof(struct ceph_mds_reply_inode) +
                sizeof(*info->in->fragtree.splits) *
@@ -91,49 +130,127 @@ static int parse_reply_info_in(void **p, void *end,
        info->xattr_data = *p;
        *p += info->xattr_len;
 
-       if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+       if (features == (u64)-1) {
+               /* inline data */
                ceph_decode_64_safe(p, end, info->inline_version, bad);
                ceph_decode_32_safe(p, end, info->inline_len, bad);
                ceph_decode_need(p, end, info->inline_len, bad);
                info->inline_data = *p;
                *p += info->inline_len;
-       } else
-               info->inline_version = CEPH_INLINE_NONE;
+               /* quota */
+               err = parse_reply_info_quota(p, end, info);
+               if (err < 0)
+                       goto out_bad;
+               /* pool namespace */
+               ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
+               if (info->pool_ns_len > 0) {
+                       ceph_decode_need(p, end, info->pool_ns_len, bad);
+                       info->pool_ns_data = *p;
+                       *p += info->pool_ns_len;
+               }
+               /* btime, change_attr */
+               {
+                       struct ceph_timespec btime;
+                       u64 change_attr;
+                       ceph_decode_need(p, end, sizeof(btime), bad);
+                       ceph_decode_copy(p, &btime, sizeof(btime));
+                       ceph_decode_64_safe(p, end, change_attr, bad);
+               }
 
-       if (features & CEPH_FEATURE_MDS_QUOTA) {
+               *p = end;
+       } else {
+               if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+                       ceph_decode_64_safe(p, end, info->inline_version, bad);
+                       ceph_decode_32_safe(p, end, info->inline_len, bad);
+                       ceph_decode_need(p, end, info->inline_len, bad);
+                       info->inline_data = *p;
+                       *p += info->inline_len;
+               } else
+                       info->inline_version = CEPH_INLINE_NONE;
+
+               if (features & CEPH_FEATURE_MDS_QUOTA) {
+                       err = parse_reply_info_quota(p, end, info);
+                       if (err < 0)
+                               goto out_bad;
+               } else {
+                       info->max_bytes = 0;
+                       info->max_files = 0;
+               }
+
+               info->pool_ns_len = 0;
+               info->pool_ns_data = NULL;
+               if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
+                       ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
+                       if (info->pool_ns_len > 0) {
+                               ceph_decode_need(p, end, info->pool_ns_len, bad);
+                               info->pool_ns_data = *p;
+                               *p += info->pool_ns_len;
+                       }
+               }
+       }
+       return 0;
+bad:
+       err = -EIO;
+out_bad:
+       return err;
+}
+
+static int parse_reply_info_dir(void **p, void *end,
+                               struct ceph_mds_reply_dirfrag **dirfrag,
+                               u64 features)
+{
+       if (features == (u64)-1) {
                u8 struct_v, struct_compat;
                u32 struct_len;
-
-               /*
-                * both struct_v and struct_compat are expected to be >= 1
-                */
                ceph_decode_8_safe(p, end, struct_v, bad);
                ceph_decode_8_safe(p, end, struct_compat, bad);
-               if (!struct_v || !struct_compat)
+               /* struct_v is expected to be >= 1. we only understand
+                * encoding whose struct_compat == 1. */
+               if (!struct_v || struct_compat != 1)
                        goto bad;
                ceph_decode_32_safe(p, end, struct_len, bad);
                ceph_decode_need(p, end, struct_len, bad);
-               ceph_decode_64_safe(p, end, info->max_bytes, bad);
-               ceph_decode_64_safe(p, end, info->max_files, bad);
-       } else {
-               info->max_bytes = 0;
-               info->max_files = 0;
+               end = *p + struct_len;
        }
 
-       info->pool_ns_len = 0;
-       info->pool_ns_data = NULL;
-       if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
-               ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
-               if (info->pool_ns_len > 0) {
-                       ceph_decode_need(p, end, info->pool_ns_len, bad);
-                       info->pool_ns_data = *p;
-                       *p += info->pool_ns_len;
-               }
+       ceph_decode_need(p, end, sizeof(**dirfrag), bad);
+       *dirfrag = *p;
+       *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
+       if (unlikely(*p > end))
+               goto bad;
+       if (features == (u64)-1)
+               *p = end;
+       return 0;
+bad:
+       return -EIO;
+}
+
+static int parse_reply_info_lease(void **p, void *end,
+                                 struct ceph_mds_reply_lease **lease,
+                                 u64 features)
+{
+       if (features == (u64)-1) {
+               u8 struct_v, struct_compat;
+               u32 struct_len;
+               ceph_decode_8_safe(p, end, struct_v, bad);
+               ceph_decode_8_safe(p, end, struct_compat, bad);
+               /* struct_v is expected to be >= 1. we only understand
+                * encoding whose struct_compat == 1. */
+               if (!struct_v || struct_compat != 1)
+                       goto bad;
+               ceph_decode_32_safe(p, end, struct_len, bad);
+               ceph_decode_need(p, end, struct_len, bad);
+               end = *p + struct_len;
        }
 
+       ceph_decode_need(p, end, sizeof(**lease), bad);
+       *lease = *p;
+       *p += sizeof(**lease);
+       if (features == (u64)-1)
+               *p = end;
        return 0;
 bad:
-       return err;
+       return -EIO;
 }
 
 /*
@@ -151,20 +268,18 @@ static int parse_reply_info_trace(void **p, void *end,
                if (err < 0)
                        goto out_bad;
 
-               if (unlikely(*p + sizeof(*info->dirfrag) > end))
-                       goto bad;
-               info->dirfrag = *p;
-               *p += sizeof(*info->dirfrag) +
-                       sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
-               if (unlikely(*p > end))
-                       goto bad;
+               err = parse_reply_info_dir(p, end, &info->dirfrag, features);
+               if (err < 0)
+                       goto out_bad;
 
                ceph_decode_32_safe(p, end, info->dname_len, bad);
                ceph_decode_need(p, end, info->dname_len, bad);
                info->dname = *p;
                *p += info->dname_len;
-               info->dlease = *p;
-               *p += sizeof(*info->dlease);
+
+               err = parse_reply_info_lease(p, end, &info->dlease, features);
+               if (err < 0)
+                       goto out_bad;
        }
 
        if (info->head->is_target) {
@@ -187,20 +302,16 @@ out_bad:
 /*
  * parse readdir results
  */
-static int parse_reply_info_dir(void **p, void *end,
+static int parse_reply_info_readdir(void **p, void *end,
                                struct ceph_mds_reply_info_parsed *info,
                                u64 features)
 {
        u32 num, i = 0;
        int err;
 
-       info->dir_dir = *p;
-       if (*p + sizeof(*info->dir_dir) > end)
-               goto bad;
-       *p += sizeof(*info->dir_dir) +
-               sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
-       if (*p > end)
-               goto bad;
+       err = parse_reply_info_dir(p, end, &info->dir_dir, features);
+       if (err < 0)
+               goto out_bad;
 
        ceph_decode_need(p, end, sizeof(num) + 2, bad);
        num = ceph_decode_32(p);
@@ -226,15 +337,16 @@ static int parse_reply_info_dir(void **p, void *end,
        while (num) {
                struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
                /* dentry */
-               ceph_decode_need(p, end, sizeof(u32)*2, bad);
-               rde->name_len = ceph_decode_32(p);
+               ceph_decode_32_safe(p, end, rde->name_len, bad);
                ceph_decode_need(p, end, rde->name_len, bad);
                rde->name = *p;
                *p += rde->name_len;
                dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
-               rde->lease = *p;
-               *p += sizeof(struct ceph_mds_reply_lease);
 
+               /* dentry lease */
+               err = parse_reply_info_lease(p, end, &rde->lease, features);
+               if (err)
+                       goto out_bad;
                /* inode */
                err = parse_reply_info_in(p, end, &rde->inode, features);
                if (err < 0)
@@ -285,7 +397,8 @@ static int parse_reply_info_create(void **p, void *end,
                                  struct ceph_mds_reply_info_parsed *info,
                                  u64 features)
 {
-       if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
+       if (features == (u64)-1 ||
+           (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
                if (*p == end) {
                        info->has_create_ino = false;
                } else {
@@ -314,7 +427,7 @@ static int parse_reply_info_extra(void **p, void *end,
        if (op == CEPH_MDS_OP_GETFILELOCK)
                return parse_reply_info_filelock(p, end, info, features);
        else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
-               return parse_reply_info_dir(p, end, info, features);
+               return parse_reply_info_readdir(p, end, info, features);
        else if (op == CEPH_MDS_OP_CREATE)
                return parse_reply_info_create(p, end, info, features);
        else
@@ -2657,7 +2770,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 
        dout("handle_reply tid %lld result %d\n", tid, result);
        rinfo = &req->r_reply_info;
-       err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
+       if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
+               err = parse_reply_info(msg, rinfo, (u64)-1);
+       else
+               err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
        mutex_unlock(&mdsc->mutex);
 
        mutex_lock(&session->s_mutex);
index d3a5c40..0919aac 100644 (file)
@@ -26,6 +26,7 @@
 #define CEPHFS_FEATURES_CLIENT_SUPPORTED {     \
        0, 1, 2, 3, 4, 5, 6, 7,                 \
        CEPHFS_FEATURE_MIMIC,                   \
+       CEPHFS_FEATURE_REPLY_ENCODING,          \
        CEPHFS_FEATURE_LAZY_CAP_WANTED,         \
        CEPHFS_FEATURE_MULTI_RECONNECT,         \
 }