Merge branch 'misc.namei' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs
[linux-2.6-microblaze.git] / fs / ceph / mds_client.c
index 0b69aec..7cad180 100644 (file)
@@ -11,6 +11,7 @@
 #include <linux/ratelimit.h>
 #include <linux/bits.h>
 #include <linux/ktime.h>
+#include <linux/bitmap.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -652,14 +653,9 @@ const char *ceph_session_state_name(int s)
 
 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
 {
-       if (refcount_inc_not_zero(&s->s_ref)) {
-               dout("mdsc get_session %p %d -> %d\n", s,
-                    refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
+       if (refcount_inc_not_zero(&s->s_ref))
                return s;
-       } else {
-               dout("mdsc get_session %p 0 -- FAIL\n", s);
-               return NULL;
-       }
+       return NULL;
 }
 
 void ceph_put_mds_session(struct ceph_mds_session *s)
@@ -667,8 +663,6 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
        if (IS_ERR_OR_NULL(s))
                return;
 
-       dout("mdsc put_session %p %d -> %d\n", s,
-            refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
        if (refcount_dec_and_test(&s->s_ref)) {
                if (s->s_auth.authorizer)
                        ceph_auth_destroy_authorizer(s->s_auth.authorizer);
@@ -743,8 +737,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_mdsc = mdsc;
        s->s_mds = mds;
        s->s_state = CEPH_MDS_SESSION_NEW;
-       s->s_ttl = 0;
-       s->s_seq = 0;
        mutex_init(&s->s_mutex);
 
        ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
@@ -753,17 +745,11 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_cap_ttl = jiffies - 1;
 
        spin_lock_init(&s->s_cap_lock);
-       s->s_renew_requested = 0;
-       s->s_renew_seq = 0;
        INIT_LIST_HEAD(&s->s_caps);
-       s->s_nr_caps = 0;
        refcount_set(&s->s_ref, 1);
        INIT_LIST_HEAD(&s->s_waiting);
        INIT_LIST_HEAD(&s->s_unsafe);
        xa_init(&s->s_delegated_inos);
-       s->s_num_cap_releases = 0;
-       s->s_cap_reconnect = 0;
-       s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
 
@@ -811,6 +797,33 @@ static void put_request_session(struct ceph_mds_request *req)
        }
 }
 
+void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
+                               void (*cb)(struct ceph_mds_session *),
+                               bool check_state)
+{
+       int mds;
+
+       mutex_lock(&mdsc->mutex);
+       for (mds = 0; mds < mdsc->max_sessions; ++mds) {
+               struct ceph_mds_session *s;
+
+               s = __ceph_lookup_mds_session(mdsc, mds);
+               if (!s)
+                       continue;
+
+               if (check_state && !check_session_state(s)) {
+                       ceph_put_mds_session(s);
+                       continue;
+               }
+
+               mutex_unlock(&mdsc->mutex);
+               cb(s);
+               ceph_put_mds_session(s);
+               mutex_lock(&mdsc->mutex);
+       }
+       mutex_unlock(&mdsc->mutex);
+}
+
 void ceph_mdsc_release_request(struct kref *kref)
 {
        struct ceph_mds_request *req = container_of(kref,
@@ -1155,7 +1168,7 @@ random:
 /*
  * session messages
  */
-static struct ceph_msg *create_session_msg(u32 op, u64 seq)
+struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
 {
        struct ceph_msg *msg;
        struct ceph_mds_session_head *h;
@@ -1163,7 +1176,8 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
                           false);
        if (!msg) {
-               pr_err("create_session_msg ENOMEM creating msg\n");
+               pr_err("ENOMEM creating session %s msg\n",
+                      ceph_session_op_name(op));
                return NULL;
        }
        h = msg->front.iov_base;
@@ -1294,7 +1308,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
        msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
                           GFP_NOFS, false);
        if (!msg) {
-               pr_err("create_session_msg ENOMEM creating msg\n");
+               pr_err("ENOMEM creating session open msg\n");
                return ERR_PTR(-ENOMEM);
        }
        p = msg->front.iov_base;
@@ -1583,14 +1597,39 @@ out:
        return ret;
 }
 
+static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_cap_snap *capsnap;
+       int capsnap_release = 0;
+
+       lockdep_assert_held(&ci->i_ceph_lock);
+
+       dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
+
+       while (!list_empty(&ci->i_cap_snaps)) {
+               capsnap = list_first_entry(&ci->i_cap_snaps,
+                                          struct ceph_cap_snap, ci_item);
+               __ceph_remove_capsnap(inode, capsnap, NULL, NULL);
+               ceph_put_snap_context(capsnap->context);
+               ceph_put_cap_snap(capsnap);
+               capsnap_release++;
+       }
+       wake_up_all(&ci->i_cap_wq);
+       wake_up_all(&mdsc->cap_flushing_wq);
+       return capsnap_release;
+}
+
 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                                  void *arg)
 {
        struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        LIST_HEAD(to_remove);
        bool dirty_dropped = false;
        bool invalidate = false;
+       int capsnap_release = 0;
 
        dout("removing cap %p, ci is %p, inode is %p\n",
             cap, ci, &ci->vfs_inode);
@@ -1598,7 +1637,6 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
        __ceph_remove_cap(cap, false);
        if (!ci->i_auth_cap) {
                struct ceph_cap_flush *cf;
-               struct ceph_mds_client *mdsc = fsc->mdsc;
 
                if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
                        if (inode->i_data.nrpages > 0)
@@ -1662,6 +1700,9 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                        list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
                        ci->i_prealloc_cap_flush = NULL;
                }
+
+               if (!list_empty(&ci->i_cap_snaps))
+                       capsnap_release = remove_capsnaps(mdsc, inode);
        }
        spin_unlock(&ci->i_ceph_lock);
        while (!list_empty(&to_remove)) {
@@ -1678,6 +1719,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                ceph_queue_invalidate(inode);
        if (dirty_dropped)
                iput(inode);
+       while (capsnap_release--)
+               iput(inode);
        return 0;
 }
 
@@ -1803,8 +1846,8 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
 
        dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
                ceph_mds_state_name(state));
-       msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
-                                ++session->s_renew_seq);
+       msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
+                                     ++session->s_renew_seq);
        if (!msg)
                return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
@@ -1818,7 +1861,7 @@ static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
 
        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
             session->s_mds, ceph_session_state_name(session->s_state), seq);
-       msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
+       msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
        if (!msg)
                return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
@@ -1870,7 +1913,8 @@ static int request_close_session(struct ceph_mds_session *session)
        dout("request_close_session mds%d state %s seq %lld\n",
             session->s_mds, ceph_session_state_name(session->s_state),
             session->s_seq);
-       msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
+       msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
+                                     session->s_seq);
        if (!msg)
                return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
@@ -1965,7 +2009,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
 
        if (oissued) {
                /* we aren't the only cap.. just remove us */
-               __ceph_remove_cap(cap, true);
+               ceph_remove_cap(cap, true);
                (*remaining)--;
        } else {
                struct dentry *dentry;
@@ -4150,13 +4194,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                          struct ceph_mdsmap *newmap,
                          struct ceph_mdsmap *oldmap)
 {
-       int i;
+       int i, j, err;
        int oldstate, newstate;
        struct ceph_mds_session *s;
+       unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
 
        dout("check_new_map new %u old %u\n",
             newmap->m_epoch, oldmap->m_epoch);
 
+       if (newmap->m_info) {
+               for (i = 0; i < newmap->possible_max_rank; i++) {
+                       for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
+                               set_bit(newmap->m_info[i].export_targets[j], targets);
+               }
+       }
+
        for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
                if (!mdsc->sessions[i])
                        continue;
@@ -4210,6 +4262,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
                    newstate >= CEPH_MDS_STATE_RECONNECT) {
                        mutex_unlock(&mdsc->mutex);
+                       clear_bit(i, targets);
                        send_mds_reconnect(mdsc, s);
                        mutex_lock(&mdsc->mutex);
                }
@@ -4232,6 +4285,51 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                }
        }
 
+       /*
+        * Only open and reconnect sessions that don't exist yet.
+        */
+       for (i = 0; i < newmap->possible_max_rank; i++) {
+               /*
+                * In case the import MDS is crashed just after
+                * the EImportStart journal is flushed, so when
+                * a standby MDS takes over it and is replaying
+                * the EImportStart journal the new MDS daemon
+                * will wait the client to reconnect it, but the
+                * client may never register/open the session yet.
+                *
+                * Will try to reconnect that MDS daemon if the
+                * rank number is in the export targets array and
+                * is the up:reconnect state.
+                */
+               newstate = ceph_mdsmap_get_state(newmap, i);
+               if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
+                       continue;
+
+               /*
+                * The session maybe registered and opened by some
+                * requests which were choosing random MDSes during
+                * the mdsc->mutex's unlock/lock gap below in rare
+                * case. But the related MDS daemon will just queue
+                * that requests and be still waiting for the client's
+                * reconnection request in up:reconnect state.
+                */
+               s = __ceph_lookup_mds_session(mdsc, i);
+               if (likely(!s)) {
+                       s = __open_export_target_session(mdsc, i);
+                       if (IS_ERR(s)) {
+                               err = PTR_ERR(s);
+                               pr_err("failed to open export target session, err %d\n",
+                                      err);
+                               continue;
+                       }
+               }
+               dout("send reconnect to export target mds.%d\n", i);
+               mutex_unlock(&mdsc->mutex);
+               send_mds_reconnect(mdsc, s);
+               ceph_put_mds_session(s);
+               mutex_lock(&mdsc->mutex);
+       }
+
        for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
                s = mdsc->sessions[i];
                if (!s)
@@ -4409,24 +4507,12 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
 }
 
 /*
- * lock unlock sessions, to wait ongoing session activities
+ * lock unlock the session, to wait ongoing session activities
  */
-static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
+static void lock_unlock_session(struct ceph_mds_session *s)
 {
-       int i;
-
-       mutex_lock(&mdsc->mutex);
-       for (i = 0; i < mdsc->max_sessions; i++) {
-               struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
-               if (!s)
-                       continue;
-               mutex_unlock(&mdsc->mutex);
-               mutex_lock(&s->s_mutex);
-               mutex_unlock(&s->s_mutex);
-               ceph_put_mds_session(s);
-               mutex_lock(&mdsc->mutex);
-       }
-       mutex_unlock(&mdsc->mutex);
+       mutex_lock(&s->s_mutex);
+       mutex_unlock(&s->s_mutex);
 }
 
 static void maybe_recover_session(struct ceph_mds_client *mdsc)
@@ -4448,6 +4534,8 @@ static void maybe_recover_session(struct ceph_mds_client *mdsc)
 
 bool check_session_state(struct ceph_mds_session *s)
 {
+       struct ceph_fs_client *fsc = s->s_mdsc->fsc;
+
        switch (s->s_state) {
        case CEPH_MDS_SESSION_OPEN:
                if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
@@ -4456,8 +4544,9 @@ bool check_session_state(struct ceph_mds_session *s)
                }
                break;
        case CEPH_MDS_SESSION_CLOSING:
-               /* Should never reach this when we're unmounting */
-               WARN_ON_ONCE(s->s_ttl);
+               /* Should never reach this when not force unmounting */
+               WARN_ON_ONCE(s->s_ttl &&
+                            READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN);
                fallthrough;
        case CEPH_MDS_SESSION_NEW:
        case CEPH_MDS_SESSION_RESTARTING:
@@ -4584,21 +4673,12 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        init_completion(&mdsc->safe_umount_waiters);
        init_waitqueue_head(&mdsc->session_close_wq);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
-       mdsc->sessions = NULL;
-       atomic_set(&mdsc->num_sessions, 0);
-       mdsc->max_sessions = 0;
-       mdsc->stopping = 0;
-       atomic64_set(&mdsc->quotarealms_count, 0);
        mdsc->quotarealms_inodes = RB_ROOT;
        mutex_init(&mdsc->quotarealms_inodes_mutex);
-       mdsc->last_snap_seq = 0;
        init_rwsem(&mdsc->snap_rwsem);
        mdsc->snap_realms = RB_ROOT;
        INIT_LIST_HEAD(&mdsc->snap_empty);
-       mdsc->num_snap_realms = 0;
        spin_lock_init(&mdsc->snap_empty_lock);
-       mdsc->last_tid = 0;
-       mdsc->oldest_tid = 0;
        mdsc->request_tree = RB_ROOT;
        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
        mdsc->last_renew_caps = jiffies;
@@ -4610,11 +4690,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        mdsc->last_cap_flush_tid = 1;
        INIT_LIST_HEAD(&mdsc->cap_flush_list);
        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
-       mdsc->num_cap_flushing = 0;
        spin_lock_init(&mdsc->cap_dirty_lock);
        init_waitqueue_head(&mdsc->cap_flushing_wq);
        INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
-       atomic_set(&mdsc->cap_reclaim_pending, 0);
        err = ceph_metric_init(&mdsc->metric);
        if (err)
                goto err_mdsmap;
@@ -4676,6 +4754,30 @@ static void wait_requests(struct ceph_mds_client *mdsc)
        dout("wait_requests done\n");
 }
 
+void send_flush_mdlog(struct ceph_mds_session *s)
+{
+       struct ceph_msg *msg;
+
+       /*
+        * Pre-luminous MDS crashes when it sees an unknown session request
+        */
+       if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
+               return;
+
+       mutex_lock(&s->s_mutex);
+       dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
+            ceph_session_state_name(s->s_state), s->s_seq);
+       msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
+                                     s->s_seq);
+       if (!msg) {
+               pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
+                      s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
+       } else {
+               ceph_con_send(&s->s_con, msg);
+       }
+       mutex_unlock(&s->s_mutex);
+}
+
 /*
  * called before mount is ro, and before dentries are torn down.
  * (hmm, does this still race with new lookups?)
@@ -4685,7 +4787,8 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
        dout("pre_umount\n");
        mdsc->stopping = 1;
 
-       lock_unlock_sessions(mdsc);
+       ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
+       ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
        ceph_flush_dirty_caps(mdsc);
        wait_requests(mdsc);
 
@@ -4912,7 +5015,6 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
 
        ceph_metric_destroy(&mdsc->metric);
 
-       flush_delayed_work(&mdsc->metric.delayed_work);
        fsc->mdsc = NULL;
        kfree(mdsc);
        dout("mdsc_destroy %p done\n", mdsc);