ceph: fix possible long time wait during umount
[linux-2.6-microblaze.git] / fs / ceph / mds_client.c
index a516329..6dca3b4 100644 (file)
@@ -708,8 +708,10 @@ void ceph_mdsc_release_request(struct kref *kref)
                /* avoid calling iput_final() in mds dispatch threads */
                ceph_async_iput(req->r_inode);
        }
-       if (req->r_parent)
+       if (req->r_parent) {
                ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
+               ceph_async_iput(req->r_parent);
+       }
        ceph_async_iput(req->r_target_inode);
        if (req->r_dentry)
                dput(req->r_dentry);
@@ -972,14 +974,14 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
                                     frag.frag, mds,
                                     (int)r, frag.ndist);
                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
-                                   CEPH_MDS_STATE_ACTIVE)
+                                   CEPH_MDS_STATE_ACTIVE &&
+                                   !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
                                        goto out;
                        }
 
                        /* since this file/dir wasn't known to be
                         * replicated, then we want to look for the
                         * authoritative mds. */
-                       mode = USE_AUTH_MDS;
                        if (frag.mds >= 0) {
                                /* choose auth mds */
                                mds = frag.mds;
@@ -987,9 +989,14 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
                                     "frag %u mds%d (auth)\n",
                                     inode, ceph_vinop(inode), frag.frag, mds);
                                if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
-                                   CEPH_MDS_STATE_ACTIVE)
-                                       goto out;
+                                   CEPH_MDS_STATE_ACTIVE) {
+                                       if (mode == USE_ANY_MDS &&
+                                           !ceph_mdsmap_is_laggy(mdsc->mdsmap,
+                                                                 mds))
+                                               goto out;
+                               }
                        }
+                       mode = USE_AUTH_MDS;
                }
        }
 
@@ -2015,7 +2022,7 @@ void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
        if (!nr)
                return;
        val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
-       if (!(val % CEPH_CAPS_PER_RELEASE)) {
+       if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
                atomic_set(&mdsc->cap_reclaim_pending, 0);
                ceph_queue_cap_reclaim_work(mdsc);
        }
@@ -2032,12 +2039,13 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
        struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
        size_t size = sizeof(struct ceph_mds_reply_dir_entry);
-       int order, num_entries;
+       unsigned int num_entries;
+       int order;
 
        spin_lock(&ci->i_ceph_lock);
        num_entries = ci->i_files + ci->i_subdirs;
        spin_unlock(&ci->i_ceph_lock);
-       num_entries = max(num_entries, 1);
+       num_entries = max(num_entries, 1U);
        num_entries = min(num_entries, opt->max_readdir);
 
        order = get_order(size * num_entries);
@@ -2182,13 +2190,17 @@ retry:
        }
        base = ceph_ino(d_inode(temp));
        rcu_read_unlock();
-       if (pos < 0 || read_seqretry(&rename_lock, seq)) {
-               pr_err("build_path did not end path lookup where "
-                      "expected, pos is %d\n", pos);
-               /* presumably this is only possible if racing with a
-                  rename of one of the parent directories (we can not
-                  lock the dentries above us to prevent this, but
-                  retrying should be harmless) */
+
+       if (read_seqretry(&rename_lock, seq))
+               goto retry;
+
+       if (pos < 0) {
+               /*
+                * A rename didn't occur, but somehow we didn't end up where
+                * we thought we would. Throw a warning and try again.
+                */
+               pr_warn("build_path did not end path lookup where "
+                       "expected, pos is %d\n", pos);
                goto retry;
        }
 
@@ -2345,6 +2357,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        head->op = cpu_to_le32(req->r_op);
        head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
        head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
+       head->ino = 0;
        head->args = req->r_args;
 
        ceph_encode_filepath(&p, end, ino1, path1);
@@ -2670,8 +2683,10 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
        /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
        if (req->r_inode)
                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
-       if (req->r_parent)
+       if (req->r_parent) {
                ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
+               ihold(req->r_parent);
+       }
        if (req->r_old_dentry_dir)
                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
                                  CEPH_CAP_PIN);
@@ -2869,6 +2884,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
                __unregister_request(mdsc, req);
 
+               /* last request during umount? */
+               if (mdsc->stopping && !__get_oldest_req(mdsc))
+                       complete_all(&mdsc->safe_umount_waiters);
+
                if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
                        /*
                         * We already handled the unsafe response, now do the
@@ -2879,9 +2898,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                         */
                        dout("got safe reply %llu, mds%d\n", tid, mds);
 
-                       /* last unsafe request during umount? */
-                       if (mdsc->stopping && !__get_oldest_req(mdsc))
-                               complete_all(&mdsc->safe_umount_waiters);
                        mutex_unlock(&mdsc->mutex);
                        goto out;
                }
@@ -4163,6 +4179,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
        mdsc->last_renew_caps = jiffies;
        INIT_LIST_HEAD(&mdsc->cap_delay_list);
+       INIT_LIST_HEAD(&mdsc->cap_wait_list);
        spin_lock_init(&mdsc->cap_delay_lock);
        INIT_LIST_HEAD(&mdsc->snap_flush_list);
        spin_lock_init(&mdsc->snap_flush_lock);
@@ -4598,11 +4615,8 @@ static struct ceph_connection *con_get(struct ceph_connection *con)
 {
        struct ceph_mds_session *s = con->private;
 
-       if (get_session(s)) {
-               dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
+       if (get_session(s))
                return con;
-       }
-       dout("mdsc con_get %p FAIL\n", s);
        return NULL;
 }
 
@@ -4610,7 +4624,6 @@ static void con_put(struct ceph_connection *con)
 {
        struct ceph_mds_session *s = con->private;
 
-       dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
        ceph_put_mds_session(s);
 }