libceph, ceph: implement msgr2.1 protocol (crc and secure modes)
[linux-2.6-microblaze.git] / fs / ceph / mds_client.c
index 8f1d750..740d63d 100644 (file)
@@ -516,13 +516,9 @@ static int parse_reply_info_create(void **p, void *end,
                        /* Malformed reply? */
                        info->has_create_ino = false;
                } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
-                       u8 struct_v, struct_compat;
-                       u32 len;
-
                        info->has_create_ino = true;
-                       ceph_decode_8_safe(p, end, struct_v, bad);
-                       ceph_decode_8_safe(p, end, struct_compat, bad);
-                       ceph_decode_32_safe(p, end, len, bad);
+                       /* struct_v, struct_compat, and len */
+                       ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
                        ceph_decode_64_safe(p, end, info->ino, bad);
                        ret = ceph_parse_deleg_inos(p, end, s);
                        if (ret)
@@ -837,6 +833,7 @@ void ceph_mdsc_release_request(struct kref *kref)
        }
        kfree(req->r_path1);
        kfree(req->r_path2);
+       put_cred(req->r_cred);
        if (req->r_pagelist)
                ceph_pagelist_release(req->r_pagelist);
        put_request_session(req);
@@ -892,8 +889,7 @@ static void __register_request(struct ceph_mds_client *mdsc,
        ceph_mdsc_get_request(req);
        insert_request(&mdsc->request_tree, req);
 
-       req->r_uid = current_fsuid();
-       req->r_gid = current_fsgid();
+       req->r_cred = get_current_cred();
 
        if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
                mdsc->oldest_tid = req->r_tid;
@@ -1243,7 +1239,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
 {
        struct ceph_msg *msg;
        struct ceph_mds_session_head *h;
-       int i = -1;
+       int i;
        int extra_bytes = 0;
        int metadata_key_count = 0;
        struct ceph_options *opt = mdsc->fsc->client->options;
@@ -1595,7 +1591,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                struct ceph_cap_flush *cf;
                struct ceph_mds_client *mdsc = fsc->mdsc;
 
-               if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+               if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
                        if (inode->i_data.nrpages > 0)
                                invalidate = true;
                        if (ci->i_wrbuffer_ref > 0)
@@ -2482,21 +2478,24 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
 /*
  * called under mdsc->mutex
  */
-static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
+static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
                                               struct ceph_mds_request *req,
-                                              int mds, bool drop_cap_releases)
+                                              bool drop_cap_releases)
 {
+       int mds = session->s_mds;
+       struct ceph_mds_client *mdsc = session->s_mdsc;
        struct ceph_msg *msg;
-       struct ceph_mds_request_head *head;
+       struct ceph_mds_request_head_old *head;
        const char *path1 = NULL;
        const char *path2 = NULL;
        u64 ino1 = 0, ino2 = 0;
        int pathlen1 = 0, pathlen2 = 0;
        bool freepath1 = false, freepath2 = false;
-       int len;
+       int len, i;
        u16 releases;
        void *p, *end;
        int ret;
+       bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
 
        ret = set_request_path_attr(req->r_inode, req->r_dentry,
                              req->r_parent, req->r_path1, req->r_ino1.ino,
@@ -2518,14 +2517,23 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
                goto out_free1;
        }
 
-       len = sizeof(*head) +
-               pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
+       if (legacy) {
+               /* Old style */
+               len = sizeof(*head);
+       } else {
+               /* New style: add gid_list and any later fields */
+               len = sizeof(struct ceph_mds_request_head) + sizeof(u32) +
+                     (sizeof(u64) * req->r_cred->group_info->ngroups);
+       }
+
+       len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
                sizeof(struct ceph_timespec);
 
        /* calculate (max) length for cap releases */
        len += sizeof(struct ceph_mds_request_release) *
                (!!req->r_inode_drop + !!req->r_dentry_drop +
                 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
+
        if (req->r_dentry_drop)
                len += pathlen1;
        if (req->r_old_dentry_drop)
@@ -2537,17 +2545,33 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
                goto out_free2;
        }
 
-       msg->hdr.version = cpu_to_le16(2);
        msg->hdr.tid = cpu_to_le64(req->r_tid);
 
-       head = msg->front.iov_base;
-       p = msg->front.iov_base + sizeof(*head);
+       /*
+        * The old ceph_mds_request_header didn't contain a version field, and
+        * one was added when we moved the message version from 3->4.
+        */
+       if (legacy) {
+               msg->hdr.version = cpu_to_le16(3);
+               head = msg->front.iov_base;
+               p = msg->front.iov_base + sizeof(*head);
+       } else {
+               struct ceph_mds_request_head *new_head = msg->front.iov_base;
+
+               msg->hdr.version = cpu_to_le16(4);
+               new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
+               head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
+               p = msg->front.iov_base + sizeof(*new_head);
+       }
+
        end = msg->front.iov_base + msg->front.iov_len;
 
        head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
        head->op = cpu_to_le32(req->r_op);
-       head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
-       head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
+       head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
+                                                req->r_cred->fsuid));
+       head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
+                                                req->r_cred->fsgid));
        head->ino = cpu_to_le64(req->r_deleg_ino);
        head->args = req->r_args;
 
@@ -2592,6 +2616,14 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
                ceph_encode_copy(&p, &ts, sizeof(ts));
        }
 
+       /* gid list */
+       if (!legacy) {
+               ceph_encode_32(&p, req->r_cred->group_info->ngroups);
+               for (i = 0; i < req->r_cred->group_info->ngroups; i++)
+                       ceph_encode_64(&p, from_kgid(&init_user_ns,
+                                      req->r_cred->group_info->gid[i]));
+       }
+
        if (WARN_ON_ONCE(p > end)) {
                ceph_msg_put(msg);
                msg = ERR_PTR(-ERANGE);
@@ -2635,14 +2667,28 @@ static void complete_request(struct ceph_mds_client *mdsc,
        complete_all(&req->r_completion);
 }
 
+static struct ceph_mds_request_head_old *
+find_old_request_head(void *p, u64 features)
+{
+       bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
+       struct ceph_mds_request_head *new_head;
+
+       if (legacy)
+               return (struct ceph_mds_request_head_old *)p;
+       new_head = (struct ceph_mds_request_head *)p;
+       return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
+}
+
 /*
  * called under mdsc->mutex
  */
-static int __prepare_send_request(struct ceph_mds_client *mdsc,
+static int __prepare_send_request(struct ceph_mds_session *session,
                                  struct ceph_mds_request *req,
-                                 int mds, bool drop_cap_releases)
+                                 bool drop_cap_releases)
 {
-       struct ceph_mds_request_head *rhead;
+       int mds = session->s_mds;
+       struct ceph_mds_client *mdsc = session->s_mdsc;
+       struct ceph_mds_request_head_old *rhead;
        struct ceph_msg *msg;
        int flags = 0;
 
@@ -2661,6 +2707,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
 
        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
                void *p;
+
                /*
                 * Replay.  Do not regenerate message (and rebuild
                 * paths, etc.); just use the original message.
@@ -2668,7 +2715,8 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
                 * d_move mangles the src name.
                 */
                msg = req->r_request;
-               rhead = msg->front.iov_base;
+               rhead = find_old_request_head(msg->front.iov_base,
+                                             session->s_con.peer_features);
 
                flags = le32_to_cpu(rhead->flags);
                flags |= CEPH_MDS_FLAG_REPLAY;
@@ -2699,14 +2747,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
                ceph_msg_put(req->r_request);
                req->r_request = NULL;
        }
-       msg = create_request_message(mdsc, req, mds, drop_cap_releases);
+       msg = create_request_message(session, req, drop_cap_releases);
        if (IS_ERR(msg)) {
                req->r_err = PTR_ERR(msg);
                return PTR_ERR(msg);
        }
        req->r_request = msg;
 
-       rhead = msg->front.iov_base;
+       rhead = find_old_request_head(msg->front.iov_base,
+                                     session->s_con.peer_features);
        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
                flags |= CEPH_MDS_FLAG_REPLAY;
@@ -2725,15 +2774,13 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
 /*
  * called under mdsc->mutex
  */
-static int __send_request(struct ceph_mds_client *mdsc,
-                         struct ceph_mds_session *session,
+static int __send_request(struct ceph_mds_session *session,
                          struct ceph_mds_request *req,
                          bool drop_cap_releases)
 {
        int err;
 
-       err = __prepare_send_request(mdsc, req, session->s_mds,
-                                    drop_cap_releases);
+       err = __prepare_send_request(session, req, drop_cap_releases);
        if (!err) {
                ceph_msg_get(req->r_request);
                ceph_con_send(&session->s_con, req->r_request);
@@ -2818,10 +2865,6 @@ static void __do_request(struct ceph_mds_client *mdsc,
             ceph_session_state_name(session->s_state));
        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
            session->s_state != CEPH_MDS_SESSION_HUNG) {
-               if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
-                       err = -EACCES;
-                       goto out_session;
-               }
                /*
                 * We cannot queue async requests since the caps and delegated
                 * inodes are bound to the session. Just return -EJUKEBOX and
@@ -2831,6 +2874,20 @@ static void __do_request(struct ceph_mds_client *mdsc,
                        err = -EJUKEBOX;
                        goto out_session;
                }
+
+               /*
+                * If the session has been REJECTED, then return a hard error,
+                * unless it's a CLEANRECOVER mount, in which case we'll queue
+                * it to the mdsc queue.
+                */
+               if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
+                       if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
+                               list_add(&req->r_wait, &mdsc->waiting_for_map);
+                       else
+                               err = -EACCES;
+                       goto out_session;
+               }
+
                if (session->s_state == CEPH_MDS_SESSION_NEW ||
                    session->s_state == CEPH_MDS_SESSION_CLOSING) {
                        err = __open_session(mdsc, session);
@@ -2850,7 +2907,7 @@ static void __do_request(struct ceph_mds_client *mdsc,
        if (req->r_request_started == 0)   /* note request start time */
                req->r_request_started = jiffies;
 
-       err = __send_request(mdsc, session, req, false);
+       err = __send_request(session, req, false);
 
 out_session:
        ceph_put_mds_session(session);
@@ -3173,6 +3230,23 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
        mutex_unlock(&mdsc->mutex);
 
+       /* Must find target inode outside of mutexes to avoid deadlocks */
+       if ((err >= 0) && rinfo->head->is_target) {
+               struct inode *in;
+               struct ceph_vino tvino = {
+                       .ino  = le64_to_cpu(rinfo->targeti.in->ino),
+                       .snap = le64_to_cpu(rinfo->targeti.in->snapid)
+               };
+
+               in = ceph_get_inode(mdsc->fsc->sb, tvino);
+               if (IS_ERR(in)) {
+                       err = PTR_ERR(in);
+                       mutex_lock(&session->s_mutex);
+                       goto out_err;
+               }
+               req->r_target_inode = in;
+       }
+
        mutex_lock(&session->s_mutex);
        if (err < 0) {
                pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
@@ -3514,7 +3588,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
 
        mutex_lock(&mdsc->mutex);
        list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
-               __send_request(mdsc, session, req, true);
+               __send_request(session, req, true);
 
        /*
         * also re-send old requests when MDS enters reconnect stage. So that MDS
@@ -3535,7 +3609,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
 
                ceph_mdsc_release_dir_caps_no_check(req);
 
-               __send_request(mdsc, session, req, true);
+               __send_request(session, req, true);
        }
        mutex_unlock(&mdsc->mutex);
 }
@@ -4374,12 +4448,7 @@ static void maybe_recover_session(struct ceph_mds_client *mdsc)
        if (!READ_ONCE(fsc->blocklisted))
                return;
 
-       if (fsc->last_auto_reconnect &&
-           time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
-               return;
-
        pr_info("auto reconnect after blocklisted\n");
-       fsc->last_auto_reconnect = jiffies;
        ceph_force_reconnect(fsc->sb);
 }
 
@@ -4678,7 +4747,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 {
        u64 want_tid, want_flush;
 
-       if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
+       if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
                return;
 
        dout("sync\n");
@@ -4855,10 +4924,8 @@ void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        void *p = msg->front.iov_base;
        void *end = p + msg->front.iov_len;
        u32 epoch;
-       u32 map_len;
        u32 num_fs;
        u32 mount_fscid = (u32)-1;
-       u8 struct_v, struct_cv;
        int err = -EINVAL;
 
        ceph_decode_need(&p, end, sizeof(u32), bad);
@@ -4866,24 +4933,17 @@ void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 
        dout("handle_fsmap epoch %u\n", epoch);
 
-       ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
-       struct_v = ceph_decode_8(&p);
-       struct_cv = ceph_decode_8(&p);
-       map_len = ceph_decode_32(&p);
-
-       ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
-       p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
+       /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
+       ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
 
-       num_fs = ceph_decode_32(&p);
+       ceph_decode_32_safe(&p, end, num_fs, bad);
        while (num_fs-- > 0) {
                void *info_p, *info_end;
                u32 info_len;
-               u8 info_v, info_cv;
                u32 fscid, namelen;
 
                ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
-               info_v = ceph_decode_8(&p);
-               info_cv = ceph_decode_8(&p);
+               p += 2;         // info_v, info_cv
                info_len = ceph_decode_32(&p);
                ceph_decode_need(&p, end, info_len, bad);
                info_p = p;
@@ -4954,7 +5014,7 @@ void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
                return;
        }
 
-       newmap = ceph_mdsmap_decode(&p, end);
+       newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
        if (IS_ERR(newmap)) {
                err = PTR_ERR(newmap);
                goto bad_unlock;
@@ -5118,8 +5178,11 @@ static int verify_authorizer_reply(struct ceph_connection *con)
        struct ceph_mds_session *s = con->private;
        struct ceph_mds_client *mdsc = s->s_mdsc;
        struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
+       struct ceph_auth_handshake *auth = &s->s_auth;
 
-       return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
+       return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
+               auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
+               NULL, NULL, NULL, NULL);
 }
 
 static int invalidate_authorizer(struct ceph_connection *con)
@@ -5133,6 +5196,80 @@ static int invalidate_authorizer(struct ceph_connection *con)
        return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
 }
 
+static int mds_get_auth_request(struct ceph_connection *con,
+                               void *buf, int *buf_len,
+                               void **authorizer, int *authorizer_len)
+{
+       struct ceph_mds_session *s = con->private;
+       struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
+       struct ceph_auth_handshake *auth = &s->s_auth;
+       int ret;
+
+       ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
+                                      buf, buf_len);
+       if (ret)
+               return ret;
+
+       *authorizer = auth->authorizer_buf;
+       *authorizer_len = auth->authorizer_buf_len;
+       return 0;
+}
+
+static int mds_handle_auth_reply_more(struct ceph_connection *con,
+                                     void *reply, int reply_len,
+                                     void *buf, int *buf_len,
+                                     void **authorizer, int *authorizer_len)
+{
+       struct ceph_mds_session *s = con->private;
+       struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
+       struct ceph_auth_handshake *auth = &s->s_auth;
+       int ret;
+
+       ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
+                                             buf, buf_len);
+       if (ret)
+               return ret;
+
+       *authorizer = auth->authorizer_buf;
+       *authorizer_len = auth->authorizer_buf_len;
+       return 0;
+}
+
+static int mds_handle_auth_done(struct ceph_connection *con,
+                               u64 global_id, void *reply, int reply_len,
+                               u8 *session_key, int *session_key_len,
+                               u8 *con_secret, int *con_secret_len)
+{
+       struct ceph_mds_session *s = con->private;
+       struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
+       struct ceph_auth_handshake *auth = &s->s_auth;
+
+       return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
+                                              session_key, session_key_len,
+                                              con_secret, con_secret_len);
+}
+
+static int mds_handle_auth_bad_method(struct ceph_connection *con,
+                                     int used_proto, int result,
+                                     const int *allowed_protos, int proto_cnt,
+                                     const int *allowed_modes, int mode_cnt)
+{
+       struct ceph_mds_session *s = con->private;
+       struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
+       int ret;
+
+       if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
+                                           used_proto, result,
+                                           allowed_protos, proto_cnt,
+                                           allowed_modes, mode_cnt)) {
+               ret = ceph_monc_validate_auth(monc);
+               if (ret)
+                       return ret;
+       }
+
+       return -EACCES;
+}
+
 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
                                struct ceph_msg_header *hdr, int *skip)
 {
@@ -5182,6 +5319,10 @@ static const struct ceph_connection_operations mds_con_ops = {
        .alloc_msg = mds_alloc_msg,
        .sign_message = mds_sign_message,
        .check_message_signature = mds_check_message_signature,
+       .get_auth_request = mds_get_auth_request,
+       .handle_auth_reply_more = mds_handle_auth_reply_more,
+       .handle_auth_done = mds_handle_auth_done,
+       .handle_auth_bad_method = mds_handle_auth_bad_method,
 };
 
 /* eof */