ceph: fix race of queuing delayed caps
[linux-2.6-microblaze.git] / fs / ceph / caps.c
index a14b2c9..1726ddc 100644 (file)
@@ -498,7 +498,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
         */
        if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
                if (issued & CEPH_CAP_FILE_SHARED)
-                       ci->i_shared_gen++;
+                       atomic_inc(&ci->i_shared_gen);
                if (S_ISDIR(ci->vfs_inode.i_mode)) {
                        dout(" marking %p NOT complete\n", &ci->vfs_inode);
                        __ceph_dir_clear_complete(ci);
@@ -577,18 +577,30 @@ void ceph_add_cap(struct inode *inode,
                }
        }
 
-       if (!ci->i_snap_realm) {
+       if (!ci->i_snap_realm ||
+           ((flags & CEPH_CAP_FLAG_AUTH) &&
+            realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) {
                /*
                 * add this inode to the appropriate snap realm
                 */
                struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
                                                               realmino);
                if (realm) {
+                       struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
+                       if (oldrealm) {
+                               spin_lock(&oldrealm->inodes_with_caps_lock);
+                               list_del_init(&ci->i_snap_realm_item);
+                               spin_unlock(&oldrealm->inodes_with_caps_lock);
+                       }
+
                        spin_lock(&realm->inodes_with_caps_lock);
                        ci->i_snap_realm = realm;
                        list_add(&ci->i_snap_realm_item,
                                 &realm->inodes_with_caps);
                        spin_unlock(&realm->inodes_with_caps_lock);
+
+                       if (oldrealm)
+                               ceph_put_snap_realm(mdsc, oldrealm);
                } else {
                        pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
                               realmino);
@@ -890,6 +902,11 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
 /*
  * called under i_ceph_lock
  */
+static int __ceph_is_single_caps(struct ceph_inode_info *ci)
+{
+       return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
+}
+
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
        return !RB_EMPTY_ROOT(&ci->i_caps);
@@ -1703,21 +1720,24 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        int mds = -1;   /* keep track of how far we've gone through i_caps list
                           to avoid an infinite loop on retry */
        struct rb_node *p;
-       int delayed = 0, sent = 0, num;
-       bool is_delayed = flags & CHECK_CAPS_NODELAY;
+       int delayed = 0, sent = 0;
+       bool no_delay = flags & CHECK_CAPS_NODELAY;
        bool queue_invalidate = false;
-       bool force_requeue = false;
        bool tried_invalidate = false;
 
        /* if we are unmounting, flush any unused caps immediately. */
        if (mdsc->stopping)
-               is_delayed = true;
+               no_delay = true;
 
        spin_lock(&ci->i_ceph_lock);
 
        if (ci->i_ceph_flags & CEPH_I_FLUSH)
                flags |= CHECK_CAPS_FLUSH;
 
+       if (!(flags & CHECK_CAPS_AUTHONLY) ||
+           (ci->i_auth_cap && __ceph_is_single_caps(ci)))
+               __cap_delay_cancel(mdsc, ci);
+
        goto retry_locked;
 retry:
        spin_lock(&ci->i_ceph_lock);
@@ -1772,7 +1792,7 @@ retry_locked:
         * have cached pages, but don't want them, then try to invalidate.
         * If we fail, it's because pages are locked.... try again later.
         */
-       if ((!is_delayed || mdsc->stopping) &&
+       if ((!no_delay || mdsc->stopping) &&
            !S_ISDIR(inode->i_mode) &&          /* ignore readdir cache */
            !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
            inode->i_data.nrpages &&            /* have cached pages */
@@ -1781,27 +1801,16 @@ retry_locked:
            !tried_invalidate) {
                dout("check_caps trying to invalidate on %p\n", inode);
                if (try_nonblocking_invalidate(inode) < 0) {
-                       if (revoking & (CEPH_CAP_FILE_CACHE|
-                                       CEPH_CAP_FILE_LAZYIO)) {
-                               dout("check_caps queuing invalidate\n");
-                               queue_invalidate = true;
-                               ci->i_rdcache_revoking = ci->i_rdcache_gen;
-                       } else {
-                               dout("check_caps failed to invalidate pages\n");
-                               /* we failed to invalidate pages.  check these
-                                  caps again later. */
-                               force_requeue = true;
-                               __cap_set_timeouts(mdsc, ci);
-                       }
+                       dout("check_caps queuing invalidate\n");
+                       queue_invalidate = true;
+                       ci->i_rdcache_revoking = ci->i_rdcache_gen;
                }
                tried_invalidate = true;
                goto retry_locked;
        }
 
-       num = 0;
        for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
                cap = rb_entry(p, struct ceph_cap, ci_node);
-               num++;
 
                /* avoid looping forever */
                if (mds >= cap->mds ||
@@ -1864,7 +1873,7 @@ retry_locked:
                    cap->mds_wanted == want)
                        continue;     /* nope, all good */
 
-               if (is_delayed)
+               if (no_delay)
                        goto ack;
 
                /* delay? */
@@ -1955,15 +1964,8 @@ ack:
                goto retry; /* retake i_ceph_lock and restart our cap scan. */
        }
 
-       /*
-        * Reschedule delayed caps release if we delayed anything,
-        * otherwise cancel.
-        */
-       if (delayed && is_delayed)
-               force_requeue = true;   /* __send_cap delayed release; requeue */
-       if (!delayed && !is_delayed)
-               __cap_delay_cancel(mdsc, ci);
-       else if (!is_delayed || force_requeue)
+       /* Reschedule delayed caps release if we delayed anything */
+       if (delayed)
                __cap_delay_requeue(mdsc, ci);
 
        spin_unlock(&ci->i_ceph_lock);
@@ -3426,7 +3428,14 @@ retry:
         */
 
        issued = cap->issued;
-       WARN_ON(issued != cap->implemented);
+       if (issued != cap->implemented)
+               pr_err_ratelimited("handle_cap_export: issued != implemented: "
+                               "ino (%llx.%llx) mds%d seq %d mseq %d "
+                               "issued %s implemented %s\n",
+                               ceph_vinop(inode), mds, cap->seq, cap->mseq,
+                               ceph_cap_string(issued),
+                               ceph_cap_string(cap->implemented));
+
 
        tcap = __get_cap_for_mds(ci, target);
        if (tcap) {
@@ -3572,12 +3581,13 @@ retry:
                if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
                    (ocap->seq != le32_to_cpu(ph->seq) ||
                     ocap->mseq != le32_to_cpu(ph->mseq))) {
-                       pr_err("handle_cap_import: mismatched seq/mseq: "
-                              "ino (%llx.%llx) mds%d seq %d mseq %d "
-                              "importer mds%d has peer seq %d mseq %d\n",
-                              ceph_vinop(inode), peer, ocap->seq,
-                              ocap->mseq, mds, le32_to_cpu(ph->seq),
-                              le32_to_cpu(ph->mseq));
+                       pr_err_ratelimited("handle_cap_import: "
+                                       "mismatched seq/mseq: ino (%llx.%llx) "
+                                       "mds%d seq %d mseq %d importer mds%d "
+                                       "has peer seq %d mseq %d\n",
+                                       ceph_vinop(inode), peer, ocap->seq,
+                                       ocap->mseq, mds, le32_to_cpu(ph->seq),
+                                       le32_to_cpu(ph->mseq));
                }
                __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
        }
@@ -3939,11 +3949,20 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
 
        cap = __get_cap_for_mds(ci, mds);
        if (cap && __cap_is_valid(cap)) {
-               if (force ||
-                   ((cap->issued & drop) &&
-                    (cap->issued & unless) == 0)) {
-                       if ((cap->issued & drop) &&
-                           (cap->issued & unless) == 0) {
+               unless &= cap->issued;
+               if (unless) {
+                       if (unless & CEPH_CAP_AUTH_EXCL)
+                               drop &= ~CEPH_CAP_AUTH_SHARED;
+                       if (unless & CEPH_CAP_LINK_EXCL)
+                               drop &= ~CEPH_CAP_LINK_SHARED;
+                       if (unless & CEPH_CAP_XATTR_EXCL)
+                               drop &= ~CEPH_CAP_XATTR_SHARED;
+                       if (unless & CEPH_CAP_FILE_EXCL)
+                               drop &= ~CEPH_CAP_FILE_SHARED;
+               }
+
+               if (force || (cap->issued & drop)) {
+                       if (cap->issued & drop) {
                                int wanted = __ceph_caps_wanted(ci);
                                if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
                                        wanted |= cap->mds_wanted;
@@ -3975,7 +3994,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
                        *p += sizeof(*rel);
                        ret = 1;
                } else {
-                       dout("encode_inode_release %p cap %p %s\n",
+                       dout("encode_inode_release %p cap %p %s (noop)\n",
                             inode, cap, ceph_cap_string(cap->issued));
                }
        }