Merge branch 'misc.namei' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs
[linux-2.6-microblaze.git] / fs / ceph / caps.c
index a5e93b1..6c0e52f 100644 (file)
@@ -645,9 +645,7 @@ void ceph_add_cap(struct inode *inode,
        dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
             session->s_mds, cap_id, ceph_cap_string(issued), seq);
 
-       spin_lock(&session->s_gen_ttl_lock);
-       gen = session->s_cap_gen;
-       spin_unlock(&session->s_gen_ttl_lock);
+       gen = atomic_read(&session->s_cap_gen);
 
        cap = __get_cap_for_mds(ci, mds);
        if (!cap) {
@@ -705,29 +703,12 @@ void ceph_add_cap(struct inode *inode,
                 */
                struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
                                                               realmino);
-               if (realm) {
-                       struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
-                       if (oldrealm) {
-                               spin_lock(&oldrealm->inodes_with_caps_lock);
-                               list_del_init(&ci->i_snap_realm_item);
-                               spin_unlock(&oldrealm->inodes_with_caps_lock);
-                       }
-
-                       spin_lock(&realm->inodes_with_caps_lock);
-                       list_add(&ci->i_snap_realm_item,
-                                &realm->inodes_with_caps);
-                       ci->i_snap_realm = realm;
-                       if (realm->ino == ci->i_vino.ino)
-                               realm->inode = inode;
-                       spin_unlock(&realm->inodes_with_caps_lock);
-
-                       if (oldrealm)
-                               ceph_put_snap_realm(mdsc, oldrealm);
-               } else {
-                       pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
-                              realmino);
-                       WARN_ON(!realm);
-               }
+               if (realm)
+                       ceph_change_snap_realm(inode, realm);
+               else
+                       WARN(1, "%s: couldn't find snap realm 0x%llx (ino 0x%llx oldrealm 0x%llx)\n",
+                            __func__, realmino, ci->i_vino.ino,
+                            ci->i_snap_realm ? ci->i_snap_realm->ino : 0);
        }
 
        __check_cap_issue(ci, cap, issued);
@@ -785,10 +766,8 @@ static int __cap_is_valid(struct ceph_cap *cap)
        unsigned long ttl;
        u32 gen;
 
-       spin_lock(&cap->session->s_gen_ttl_lock);
-       gen = cap->session->s_cap_gen;
+       gen = atomic_read(&cap->session->s_cap_gen);
        ttl = cap->session->s_cap_ttl;
-       spin_unlock(&cap->session->s_gen_ttl_lock);
 
        if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
                dout("__cap_is_valid %p cap %p issued %s "
@@ -1116,20 +1095,6 @@ int ceph_is_any_caps(struct inode *inode)
        return ret;
 }
 
-static void drop_inode_snap_realm(struct ceph_inode_info *ci)
-{
-       struct ceph_snap_realm *realm = ci->i_snap_realm;
-       spin_lock(&realm->inodes_with_caps_lock);
-       list_del_init(&ci->i_snap_realm_item);
-       ci->i_snap_realm_counter++;
-       ci->i_snap_realm = NULL;
-       if (realm->ino == ci->i_vino.ino)
-               realm->inode = NULL;
-       spin_unlock(&realm->inodes_with_caps_lock);
-       ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc,
-                           realm);
-}
-
 /*
  * Remove a cap.  Take steps to deal with a racing iterate_session_caps.
  *
@@ -1149,17 +1114,16 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
                return;
        }
 
+       lockdep_assert_held(&ci->i_ceph_lock);
+
        dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
 
        mdsc = ceph_inode_to_client(&ci->vfs_inode)->mdsc;
 
        /* remove from inode's cap rbtree, and clear auth cap */
        rb_erase(&cap->ci_node, &ci->i_caps);
-       if (ci->i_auth_cap == cap) {
-               WARN_ON_ONCE(!list_empty(&ci->i_dirty_item) &&
-                            !mdsc->fsc->blocklisted);
+       if (ci->i_auth_cap == cap)
                ci->i_auth_cap = NULL;
-       }
 
        /* remove from session list */
        spin_lock(&session->s_cap_lock);
@@ -1182,7 +1146,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
         * s_cap_gen while session is in the reconnect state.
         */
        if (queue_release &&
-           (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
+           (!session->s_cap_reconnect ||
+            cap->cap_gen == atomic_read(&session->s_cap_gen))) {
                cap->queue_release = 1;
                if (removed) {
                        __ceph_queue_cap_release(session, cap);
@@ -1204,12 +1169,34 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
                 * keep i_snap_realm.
                 */
                if (ci->i_wr_ref == 0 && ci->i_snap_realm)
-                       drop_inode_snap_realm(ci);
+                       ceph_change_snap_realm(&ci->vfs_inode, NULL);
 
                __cap_delay_cancel(mdsc, ci);
        }
 }
 
+void ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
+{
+       struct ceph_inode_info *ci = cap->ci;
+       struct ceph_fs_client *fsc;
+
+       /* 'ci' being NULL means the remove have already occurred */
+       if (!ci) {
+               dout("%s: cap inode is NULL\n", __func__);
+               return;
+       }
+
+       lockdep_assert_held(&ci->i_ceph_lock);
+
+       fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
+       WARN_ON_ONCE(ci->i_auth_cap == cap &&
+                    !list_empty(&ci->i_dirty_item) &&
+                    !fsc->blocklisted &&
+                    READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN);
+
+       __ceph_remove_cap(cap, queue_release);
+}
+
 struct cap_msg_args {
        struct ceph_mds_session *session;
        u64                     ino, cid, follows;
@@ -1338,7 +1325,7 @@ void __ceph_remove_caps(struct ceph_inode_info *ci)
        while (p) {
                struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
                p = rb_next(p);
-               __ceph_remove_cap(cap, true);
+               ceph_remove_cap(cap, true);
        }
        spin_unlock(&ci->i_ceph_lock);
 }
@@ -1534,7 +1521,7 @@ static inline int __send_flush_snap(struct inode *inode,
  * asynchronously back to the MDS once sync writes complete and dirty
  * data is written out.
  *
- * Called under i_ceph_lock.  Takes s_mutex as needed.
+ * Called under i_ceph_lock.
  */
 static void __ceph_flush_snaps(struct ceph_inode_info *ci,
                               struct ceph_mds_session *session)
@@ -1656,7 +1643,6 @@ retry:
        mds = ci->i_auth_cap->session->s_mds;
        if (session && session->s_mds != mds) {
                dout(" oops, wrong session %p mutex\n", session);
-               mutex_unlock(&session->s_mutex);
                ceph_put_mds_session(session);
                session = NULL;
        }
@@ -1665,10 +1651,6 @@ retry:
                mutex_lock(&mdsc->mutex);
                session = __ceph_lookup_mds_session(mdsc, mds);
                mutex_unlock(&mdsc->mutex);
-               if (session) {
-                       dout(" inverting session/ino locks on %p\n", session);
-                       mutex_lock(&session->s_mutex);
-               }
                goto retry;
        }
 
@@ -1680,12 +1662,10 @@ retry:
 out:
        spin_unlock(&ci->i_ceph_lock);
 
-       if (psession) {
+       if (psession)
                *psession = session;
-       } else if (session) {
-               mutex_unlock(&session->s_mutex);
+       else
                ceph_put_mds_session(session);
-       }
        /* we flushed them all; remove this inode from the queue */
        spin_lock(&mdsc->snap_flush_lock);
        list_del_init(&ci->i_snap_flush_item);
@@ -1753,7 +1733,14 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
 
 struct ceph_cap_flush *ceph_alloc_cap_flush(void)
 {
-       return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
+       struct ceph_cap_flush *cf;
+
+       cf = kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
+       if (!cf)
+               return NULL;
+
+       cf->is_capsnap = false;
+       return cf;
 }
 
 void ceph_free_cap_flush(struct ceph_cap_flush *cf)
@@ -1788,7 +1775,7 @@ static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc,
                prev->wake = true;
                wake = false;
        }
-       list_del(&cf->g_list);
+       list_del_init(&cf->g_list);
        return wake;
 }
 
@@ -1803,7 +1790,7 @@ static bool __detach_cap_flush_from_ci(struct ceph_inode_info *ci,
                prev->wake = true;
                wake = false;
        }
-       list_del(&cf->i_list);
+       list_del_init(&cf->i_list);
        return wake;
 }
 
@@ -1862,6 +1849,8 @@ static u64 __mark_caps_flushing(struct inode *inode,
  * try to invalidate mapping pages without blocking.
  */
 static int try_nonblocking_invalidate(struct inode *inode)
+       __releases(ci->i_ceph_lock)
+       __acquires(ci->i_ceph_lock)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        u32 invalidating_gen = ci->i_rdcache_gen;
@@ -1915,7 +1904,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        struct ceph_cap *cap;
        u64 flush_tid, oldest_flush_tid;
        int file_wanted, used, cap_used;
-       int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
        int issued, implemented, want, retain, revoking, flushing = 0;
        int mds = -1;   /* keep track of how far we've gone through i_caps list
                           to avoid an infinite loop on retry */
@@ -1923,14 +1911,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        bool queue_invalidate = false;
        bool tried_invalidate = false;
 
+       if (session)
+               ceph_get_mds_session(session);
+
        spin_lock(&ci->i_ceph_lock);
        if (ci->i_ceph_flags & CEPH_I_FLUSH)
                flags |= CHECK_CAPS_FLUSH;
-
-       goto retry_locked;
 retry:
-       spin_lock(&ci->i_ceph_lock);
-retry_locked:
        /* Caps wanted by virtue of active open files. */
        file_wanted = __ceph_caps_file_wanted(ci);
 
@@ -2010,7 +1997,7 @@ retry_locked:
                        ci->i_rdcache_revoking = ci->i_rdcache_gen;
                }
                tried_invalidate = true;
-               goto retry_locked;
+               goto retry;
        }
 
        for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
@@ -2024,8 +2011,6 @@ retry_locked:
                    ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap))
                        continue;
 
-               /* NOTE: no side-effects allowed, until we take s_mutex */
-
                /*
                 * If we have an auth cap, we don't need to consider any
                 * overlapping caps as used.
@@ -2088,37 +2073,8 @@ retry_locked:
                        continue;     /* nope, all good */
 
 ack:
-               if (session && session != cap->session) {
-                       dout("oops, wrong session %p mutex\n", session);
-                       mutex_unlock(&session->s_mutex);
-                       session = NULL;
-               }
-               if (!session) {
-                       session = cap->session;
-                       if (mutex_trylock(&session->s_mutex) == 0) {
-                               dout("inverting session/ino locks on %p\n",
-                                    session);
-                               session = ceph_get_mds_session(session);
-                               spin_unlock(&ci->i_ceph_lock);
-                               if (took_snap_rwsem) {
-                                       up_read(&mdsc->snap_rwsem);
-                                       took_snap_rwsem = 0;
-                               }
-                               if (session) {
-                                       mutex_lock(&session->s_mutex);
-                                       ceph_put_mds_session(session);
-                               } else {
-                                       /*
-                                        * Because we take the reference while
-                                        * holding the i_ceph_lock, it should
-                                        * never be NULL. Throw a warning if it
-                                        * ever is.
-                                        */
-                                       WARN_ON_ONCE(true);
-                               }
-                               goto retry;
-                       }
-               }
+               ceph_put_mds_session(session);
+               session = ceph_get_mds_session(cap->session);
 
                /* kick flushing and flush snaps before sending normal
                 * cap message */
@@ -2130,20 +2086,7 @@ ack:
                        if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
                                __ceph_flush_snaps(ci, session);
 
-                       goto retry_locked;
-               }
-
-               /* take snap_rwsem after session mutex */
-               if (!took_snap_rwsem) {
-                       if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
-                               dout("inverting snap/in locks on %p\n",
-                                    inode);
-                               spin_unlock(&ci->i_ceph_lock);
-                               down_read(&mdsc->snap_rwsem);
-                               took_snap_rwsem = 1;
-                               goto retry;
-                       }
-                       took_snap_rwsem = 1;
+                       goto retry;
                }
 
                if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
@@ -2165,9 +2108,10 @@ ack:
 
                __prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
                           want, retain, flushing, flush_tid, oldest_flush_tid);
-               spin_unlock(&ci->i_ceph_lock);
 
+               spin_unlock(&ci->i_ceph_lock);
                __send_cap(&arg, ci);
+               spin_lock(&ci->i_ceph_lock);
 
                goto retry; /* retake i_ceph_lock and restart our cap scan. */
        }
@@ -2182,13 +2126,9 @@ ack:
 
        spin_unlock(&ci->i_ceph_lock);
 
+       ceph_put_mds_session(session);
        if (queue_invalidate)
                ceph_queue_invalidate(inode);
-
-       if (session)
-               mutex_unlock(&session->s_mutex);
-       if (took_snap_rwsem)
-               up_read(&mdsc->snap_rwsem);
 }
 
 /*
@@ -2198,26 +2138,17 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
 {
        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_mds_session *session = NULL;
        int flushing = 0;
        u64 flush_tid = 0, oldest_flush_tid = 0;
 
-retry:
        spin_lock(&ci->i_ceph_lock);
 retry_locked:
        if (ci->i_dirty_caps && ci->i_auth_cap) {
                struct ceph_cap *cap = ci->i_auth_cap;
                struct cap_msg_args arg;
+               struct ceph_mds_session *session = cap->session;
 
-               if (session != cap->session) {
-                       spin_unlock(&ci->i_ceph_lock);
-                       if (session)
-                               mutex_unlock(&session->s_mutex);
-                       session = cap->session;
-                       mutex_lock(&session->s_mutex);
-                       goto retry;
-               }
-               if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) {
+               if (session->s_state < CEPH_MDS_SESSION_OPEN) {
                        spin_unlock(&ci->i_ceph_lock);
                        goto out;
                }
@@ -2254,9 +2185,6 @@ retry_locked:
                spin_unlock(&ci->i_ceph_lock);
        }
 out:
-       if (session)
-               mutex_unlock(&session->s_mutex);
-
        *ptid = flush_tid;
        return flushing;
 }
@@ -2286,6 +2214,7 @@ static int caps_are_flushed(struct inode *inode, u64 flush_tid)
  */
 static int unsafe_request_wait(struct inode *inode)
 {
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_mds_request *req1 = NULL, *req2 = NULL;
        int ret, err = 0;
@@ -2305,6 +2234,81 @@ static int unsafe_request_wait(struct inode *inode)
        }
        spin_unlock(&ci->i_unsafe_lock);
 
+       /*
+        * Trigger to flush the journal logs in all the relevant MDSes
+        * manually, or in the worst case we must wait at most 5 seconds
+        * to wait the journal logs to be flushed by the MDSes periodically.
+        */
+       if (req1 || req2) {
+               struct ceph_mds_session **sessions = NULL;
+               struct ceph_mds_session *s;
+               struct ceph_mds_request *req;
+               unsigned int max;
+               int i;
+
+               /*
+                * The mdsc->max_sessions is unlikely to be changed
+                * mostly, here we will retry it by reallocating the
+                * sessions arrary memory to get rid of the mdsc->mutex
+                * lock.
+                */
+retry:
+               max = mdsc->max_sessions;
+               sessions = krealloc(sessions, max * sizeof(s), __GFP_ZERO);
+               if (!sessions)
+                       return -ENOMEM;
+
+               spin_lock(&ci->i_unsafe_lock);
+               if (req1) {
+                       list_for_each_entry(req, &ci->i_unsafe_dirops,
+                                           r_unsafe_dir_item) {
+                               s = req->r_session;
+                               if (unlikely(s->s_mds > max)) {
+                                       spin_unlock(&ci->i_unsafe_lock);
+                                       goto retry;
+                               }
+                               if (!sessions[s->s_mds]) {
+                                       s = ceph_get_mds_session(s);
+                                       sessions[s->s_mds] = s;
+                               }
+                       }
+               }
+               if (req2) {
+                       list_for_each_entry(req, &ci->i_unsafe_iops,
+                                           r_unsafe_target_item) {
+                               s = req->r_session;
+                               if (unlikely(s->s_mds > max)) {
+                                       spin_unlock(&ci->i_unsafe_lock);
+                                       goto retry;
+                               }
+                               if (!sessions[s->s_mds]) {
+                                       s = ceph_get_mds_session(s);
+                                       sessions[s->s_mds] = s;
+                               }
+                       }
+               }
+               spin_unlock(&ci->i_unsafe_lock);
+
+               /* the auth MDS */
+               spin_lock(&ci->i_ceph_lock);
+               if (ci->i_auth_cap) {
+                     s = ci->i_auth_cap->session;
+                     if (!sessions[s->s_mds])
+                             sessions[s->s_mds] = ceph_get_mds_session(s);
+               }
+               spin_unlock(&ci->i_ceph_lock);
+
+               /* send flush mdlog request to MDSes */
+               for (i = 0; i < max; i++) {
+                       s = sessions[i];
+                       if (s) {
+                               send_flush_mdlog(s);
+                               ceph_put_mds_session(s);
+                       }
+               }
+               kfree(sessions);
+       }
+
        dout("unsafe_request_wait %p wait on tid %llu %llu\n",
             inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
        if (req1) {
@@ -2423,7 +2427,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
        ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
 
        list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) {
-               if (!cf->caps) {
+               if (cf->is_capsnap) {
                        last_snap_flush = cf->tid;
                        break;
                }
@@ -2442,7 +2446,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
 
                first_tid = cf->tid + 1;
 
-               if (cf->caps) {
+               if (!cf->is_capsnap) {
                        struct cap_msg_args arg;
 
                        dout("kick_flushing_caps %p cap %p tid %llu %s\n",
@@ -3075,7 +3079,7 @@ static void __ceph_put_cap_refs(struct ceph_inode_info *ci, int had,
                        }
                        /* see comment in __ceph_remove_cap() */
                        if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm)
-                               drop_inode_snap_realm(ci);
+                               ceph_change_snap_realm(inode, NULL);
                }
        }
        if (check_flushsnaps && __ceph_have_pending_cap_snap(ci)) {
@@ -3181,7 +3185,16 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                                break;
                        }
                }
-               BUG_ON(!found);
+
+               if (!found) {
+                       /*
+                        * The capsnap should already be removed when removing
+                        * auth cap in the case of a forced unmount.
+                        */
+                       WARN_ON_ONCE(ci->i_auth_cap);
+                       goto unlock;
+               }
+
                capsnap->dirty_pages -= nr;
                if (capsnap->dirty_pages == 0) {
                        complete_capsnap = true;
@@ -3203,6 +3216,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                     complete_capsnap ? " (complete capsnap)" : "");
        }
 
+unlock:
        spin_unlock(&ci->i_ceph_lock);
 
        if (last) {
@@ -3213,8 +3227,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
        if (complete_capsnap)
                wake_up_all(&ci->i_cap_wq);
        while (put-- > 0) {
-               /* avoid calling iput_final() in osd dispatch threads */
-               ceph_async_iput(inode);
+               iput(inode);
        }
 }
 
@@ -3288,7 +3301,7 @@ static void handle_cap_grant(struct inode *inode,
        u64 size = le64_to_cpu(grant->size);
        u64 max_size = le64_to_cpu(grant->max_size);
        unsigned char check_caps = 0;
-       bool was_stale = cap->cap_gen < session->s_cap_gen;
+       bool was_stale = cap->cap_gen < atomic_read(&session->s_cap_gen);
        bool wake = false;
        bool writeback = false;
        bool queue_trunc = false;
@@ -3340,7 +3353,7 @@ static void handle_cap_grant(struct inode *inode,
        }
 
        /* side effects now are allowed */
-       cap->cap_gen = session->s_cap_gen;
+       cap->cap_gen = atomic_read(&session->s_cap_gen);
        cap->seq = seq;
 
        __check_cap_issue(ci, cap, newcaps);
@@ -3553,13 +3566,12 @@ static void handle_cap_grant(struct inode *inode,
        if (wake)
                wake_up_all(&ci->i_cap_wq);
 
+       mutex_unlock(&session->s_mutex);
        if (check_caps == 1)
                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL,
                                session);
        else if (check_caps == 2)
                ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session);
-       else
-               mutex_unlock(&session->s_mutex);
 }
 
 /*
@@ -3589,7 +3601,7 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
                        cleaned = cf->caps;
 
                /* Is this a capsnap? */
-               if (cf->caps == 0)
+               if (cf->is_capsnap)
                        continue;
 
                if (cf->tid <= flush_tid) {
@@ -3662,8 +3674,9 @@ out:
        while (!list_empty(&to_remove)) {
                cf = list_first_entry(&to_remove,
                                      struct ceph_cap_flush, i_list);
-               list_del(&cf->i_list);
-               ceph_free_cap_flush(cf);
+               list_del_init(&cf->i_list);
+               if (!cf->is_capsnap)
+                       ceph_free_cap_flush(cf);
        }
 
        if (wake_ci)
@@ -3674,6 +3687,43 @@ out:
                iput(inode);
 }
 
+void __ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
+                          bool *wake_ci, bool *wake_mdsc)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+       bool ret;
+
+       lockdep_assert_held(&ci->i_ceph_lock);
+
+       dout("removing capsnap %p, inode %p ci %p\n", capsnap, inode, ci);
+
+       list_del_init(&capsnap->ci_item);
+       ret = __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
+       if (wake_ci)
+               *wake_ci = ret;
+
+       spin_lock(&mdsc->cap_dirty_lock);
+       if (list_empty(&ci->i_cap_flush_list))
+               list_del_init(&ci->i_flushing_item);
+
+       ret = __detach_cap_flush_from_mdsc(mdsc, &capsnap->cap_flush);
+       if (wake_mdsc)
+               *wake_mdsc = ret;
+       spin_unlock(&mdsc->cap_dirty_lock);
+}
+
+void ceph_remove_capsnap(struct inode *inode, struct ceph_cap_snap *capsnap,
+                        bool *wake_ci, bool *wake_mdsc)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+
+       lockdep_assert_held(&ci->i_ceph_lock);
+
+       WARN_ON_ONCE(capsnap->dirty_pages || capsnap->writing);
+       __ceph_remove_capsnap(inode, capsnap, wake_ci, wake_mdsc);
+}
+
 /*
  * Handle FLUSHSNAP_ACK.  MDS has flushed snap data to disk and we can
  * throw away our cap_snap.
@@ -3711,23 +3761,10 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
                             capsnap, capsnap->follows);
                }
        }
-       if (flushed) {
-               WARN_ON(capsnap->dirty_pages || capsnap->writing);
-               dout(" removing %p cap_snap %p follows %lld\n",
-                    inode, capsnap, follows);
-               list_del(&capsnap->ci_item);
-               wake_ci |= __detach_cap_flush_from_ci(ci, &capsnap->cap_flush);
-
-               spin_lock(&mdsc->cap_dirty_lock);
-
-               if (list_empty(&ci->i_cap_flush_list))
-                       list_del_init(&ci->i_flushing_item);
-
-               wake_mdsc |= __detach_cap_flush_from_mdsc(mdsc,
-                                                         &capsnap->cap_flush);
-               spin_unlock(&mdsc->cap_dirty_lock);
-       }
+       if (flushed)
+               ceph_remove_capsnap(inode, capsnap, &wake_ci, &wake_mdsc);
        spin_unlock(&ci->i_ceph_lock);
+
        if (flushed) {
                ceph_put_snap_context(capsnap->context);
                ceph_put_cap_snap(capsnap);
@@ -3811,7 +3848,7 @@ retry:
                goto out_unlock;
 
        if (target < 0) {
-               __ceph_remove_cap(cap, false);
+               ceph_remove_cap(cap, false);
                goto out_unlock;
        }
 
@@ -3846,7 +3883,7 @@ retry:
                                change_auth_cap_ses(ci, tcap->session);
                        }
                }
-               __ceph_remove_cap(cap, false);
+               ceph_remove_cap(cap, false);
                goto out_unlock;
        } else if (tsession) {
                /* add placeholder for the export tagert */
@@ -3863,7 +3900,7 @@ retry:
                        spin_unlock(&mdsc->cap_dirty_lock);
                }
 
-               __ceph_remove_cap(cap, false);
+               ceph_remove_cap(cap, false);
                goto out_unlock;
        }
 
@@ -3974,7 +4011,7 @@ retry:
                                        ocap->mseq, mds, le32_to_cpu(ph->seq),
                                        le32_to_cpu(ph->mseq));
                }
-               __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
+               ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
        }
 
        *old_issued = issued;
@@ -4202,9 +4239,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 done:
        mutex_unlock(&session->s_mutex);
 done_unlocked:
+       iput(inode);
+out:
        ceph_put_string(extra_info.pool_ns);
-       /* avoid calling iput_final() in mds dispatch threads */
-       ceph_async_iput(inode);
        return;
 
 flush_cap_releases:
@@ -4219,16 +4256,24 @@ flush_cap_releases:
 bad:
        pr_err("ceph_handle_caps: corrupt message\n");
        ceph_msg_dump(msg);
-       return;
+       goto out;
 }
 
 /*
  * Delayed work handler to process end of delayed cap release LRU list.
+ *
+ * If new caps are added to the list while processing it, these won't get
+ * processed in this run.  In this case, the ci->i_hold_caps_max will be
+ * returned so that the work can be scheduled accordingly.
  */
-void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
+unsigned long ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
 {
        struct inode *inode;
        struct ceph_inode_info *ci;
+       struct ceph_mount_options *opt = mdsc->fsc->mount_options;
+       unsigned long delay_max = opt->caps_wanted_delay_max * HZ;
+       unsigned long loop_start = jiffies;
+       unsigned long delay = 0;
 
        dout("check_delayed_caps\n");
        spin_lock(&mdsc->cap_delay_lock);
@@ -4236,6 +4281,11 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
                ci = list_first_entry(&mdsc->cap_delay_list,
                                      struct ceph_inode_info,
                                      i_cap_delay_list);
+               if (time_before(loop_start, ci->i_hold_caps_max - delay_max)) {
+                       dout("%s caps added recently.  Exiting loop", __func__);
+                       delay = ci->i_hold_caps_max;
+                       break;
+               }
                if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
                    time_before(jiffies, ci->i_hold_caps_max))
                        break;
@@ -4246,12 +4296,13 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
                        spin_unlock(&mdsc->cap_delay_lock);
                        dout("check_delayed_caps on %p\n", inode);
                        ceph_check_caps(ci, 0, NULL);
-                       /* avoid calling iput_final() in tick thread */
-                       ceph_async_iput(inode);
+                       iput(inode);
                        spin_lock(&mdsc->cap_delay_lock);
                }
        }
        spin_unlock(&mdsc->cap_delay_lock);
+
+       return delay;
 }
 
 /*
@@ -4280,33 +4331,9 @@ static void flush_dirty_session_caps(struct ceph_mds_session *s)
        dout("flush_dirty_caps done\n");
 }
 
-static void iterate_sessions(struct ceph_mds_client *mdsc,
-                            void (*cb)(struct ceph_mds_session *))
-{
-       int mds;
-
-       mutex_lock(&mdsc->mutex);
-       for (mds = 0; mds < mdsc->max_sessions; ++mds) {
-               struct ceph_mds_session *s;
-
-               if (!mdsc->sessions[mds])
-                       continue;
-
-               s = ceph_get_mds_session(mdsc->sessions[mds]);
-               if (!s)
-                       continue;
-
-               mutex_unlock(&mdsc->mutex);
-               cb(s);
-               ceph_put_mds_session(s);
-               mutex_lock(&mdsc->mutex);
-       }
-       mutex_unlock(&mdsc->mutex);
-}
-
 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
 {
-       iterate_sessions(mdsc, flush_dirty_session_caps);
+       ceph_mdsc_iterate_sessions(mdsc, flush_dirty_session_caps, true);
 }
 
 void __ceph_touch_fmode(struct ceph_inode_info *ci,