ceph: fix race of queuing delayed caps
authorYan, Zheng <zyan@redhat.com>
Mon, 8 Jan 2018 06:44:10 +0000 (14:44 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 29 Jan 2018 17:36:11 +0000 (18:36 +0100)
When called with CHECK_CAPS_AUTHONLY flag, ceph_check_caps() only
processes auth caps. In that case, it's unsafe to remove inode
from mdsc->cap_delay_list, because there can be delayed non-auth
caps.

Besides, ceph_check_caps() may lock/unlock i_ceph_lock several
times, when multiple threads call ceph_check_caps() at the same
time. It's possible that one thread calls __cap_delay_requeue(),
another thread calls __cap_delay_cancel(). __cap_delay_cancel()
should be called at very beginning of ceph_check_caps(), so that
it does not race with __cap_delay_requeue().

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/caps.c

index 36f8a4b..1726ddc 100644 (file)
@@ -902,6 +902,11 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
 /*
  * called under i_ceph_lock
  */
+static int __ceph_is_single_caps(struct ceph_inode_info *ci)
+{
+       return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
+}
+
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
        return !RB_EMPTY_ROOT(&ci->i_caps);
@@ -1715,21 +1720,24 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        int mds = -1;   /* keep track of how far we've gone through i_caps list
                           to avoid an infinite loop on retry */
        struct rb_node *p;
-       int delayed = 0, sent = 0, num;
-       bool is_delayed = flags & CHECK_CAPS_NODELAY;
+       int delayed = 0, sent = 0;
+       bool no_delay = flags & CHECK_CAPS_NODELAY;
        bool queue_invalidate = false;
-       bool force_requeue = false;
        bool tried_invalidate = false;
 
        /* if we are unmounting, flush any unused caps immediately. */
        if (mdsc->stopping)
-               is_delayed = true;
+               no_delay = true;
 
        spin_lock(&ci->i_ceph_lock);
 
        if (ci->i_ceph_flags & CEPH_I_FLUSH)
                flags |= CHECK_CAPS_FLUSH;
 
+       if (!(flags & CHECK_CAPS_AUTHONLY) ||
+           (ci->i_auth_cap && __ceph_is_single_caps(ci)))
+               __cap_delay_cancel(mdsc, ci);
+
        goto retry_locked;
 retry:
        spin_lock(&ci->i_ceph_lock);
@@ -1784,7 +1792,7 @@ retry_locked:
         * have cached pages, but don't want them, then try to invalidate.
         * If we fail, it's because pages are locked.... try again later.
         */
-       if ((!is_delayed || mdsc->stopping) &&
+       if ((!no_delay || mdsc->stopping) &&
            !S_ISDIR(inode->i_mode) &&          /* ignore readdir cache */
            !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
            inode->i_data.nrpages &&            /* have cached pages */
@@ -1801,10 +1809,8 @@ retry_locked:
                goto retry_locked;
        }
 
-       num = 0;
        for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
                cap = rb_entry(p, struct ceph_cap, ci_node);
-               num++;
 
                /* avoid looping forever */
                if (mds >= cap->mds ||
@@ -1867,7 +1873,7 @@ retry_locked:
                    cap->mds_wanted == want)
                        continue;     /* nope, all good */
 
-               if (is_delayed)
+               if (no_delay)
                        goto ack;
 
                /* delay? */
@@ -1958,15 +1964,8 @@ ack:
                goto retry; /* retake i_ceph_lock and restart our cap scan. */
        }
 
-       /*
-        * Reschedule delayed caps release if we delayed anything,
-        * otherwise cancel.
-        */
-       if (delayed && is_delayed)
-               force_requeue = true;   /* __send_cap delayed release; requeue */
-       if (!delayed && !is_delayed)
-               __cap_delay_cancel(mdsc, ci);
-       else if (!is_delayed || force_requeue)
+       /* Reschedule delayed caps release if we delayed anything */
+       if (delayed)
                __cap_delay_requeue(mdsc, ci);
 
        spin_unlock(&ci->i_ceph_lock);