ceph: add mount option to limit caps count
authorYan, Zheng <zyan@redhat.com>
Fri, 1 Feb 2019 06:57:15 +0000 (14:57 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Tue, 5 Mar 2019 17:55:17 +0000 (18:55 +0100)
If number of caps exceed the limit, ceph_trim_dentires() also trim
dentries with valid leases. Trimming dentry releases references to
associated inode, which may evict inode and release caps.

By default, there is no limit for caps count.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Reviewed-by: Jeff Layton <jlayton@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Documentation/filesystems/ceph.txt
fs/ceph/caps.c
fs/ceph/dir.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.c
fs/ceph/super.h
include/linux/ceph/types.h

index 1177052..bc4145e 100644 (file)
@@ -118,6 +118,10 @@ Mount Options
        of a non-responsive Ceph file system.  The default is 30
        seconds.
 
+  caps_max=X
+       Specify the maximum number of caps to hold. Unused caps are released
+       when number of caps exceeds the limit. The default is 0 (no limit)
+
   rbytes
        When stat() is called on a directory, set st_size to 'rbytes',
        the summation of file sizes over all files nested beneath that
index 6fbdc1a..36a8dc6 100644 (file)
@@ -148,11 +148,17 @@ void ceph_caps_finalize(struct ceph_mds_client *mdsc)
        spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
+void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+                             struct ceph_mount_options *fsopt)
 {
        spin_lock(&mdsc->caps_list_lock);
-       mdsc->caps_min_count += delta;
-       BUG_ON(mdsc->caps_min_count < 0);
+       mdsc->caps_min_count = fsopt->max_readdir;
+       if (mdsc->caps_min_count < 1024)
+               mdsc->caps_min_count = 1024;
+       mdsc->caps_use_max = fsopt->caps_max;
+       if (mdsc->caps_use_max > 0 &&
+           mdsc->caps_use_max < mdsc->caps_min_count)
+               mdsc->caps_use_max = mdsc->caps_min_count;
        spin_unlock(&mdsc->caps_list_lock);
 }
 
@@ -272,6 +278,7 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
        if (!err) {
                BUG_ON(have + alloc != need);
                ctx->count = need;
+               ctx->used = 0;
        }
 
        spin_lock(&mdsc->caps_list_lock);
@@ -295,13 +302,24 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 }
 
 void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
-                       struct ceph_cap_reservation *ctx)
+                        struct ceph_cap_reservation *ctx)
 {
+       bool reclaim = false;
+       if (!ctx->count)
+               return;
+
        dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
        spin_lock(&mdsc->caps_list_lock);
        __ceph_unreserve_caps(mdsc, ctx->count);
        ctx->count = 0;
+
+       if (mdsc->caps_use_max > 0 &&
+           mdsc->caps_use_count > mdsc->caps_use_max)
+               reclaim = true;
        spin_unlock(&mdsc->caps_list_lock);
+
+       if (reclaim)
+               ceph_reclaim_caps_nr(mdsc, ctx->used);
 }
 
 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
@@ -346,6 +364,7 @@ struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
        BUG_ON(list_empty(&mdsc->caps_list));
 
        ctx->count--;
+       ctx->used++;
        mdsc->caps_reserve_count--;
        mdsc->caps_use_count++;
 
@@ -500,12 +519,12 @@ static void __insert_cap_node(struct ceph_inode_info *ci,
 static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
                               struct ceph_inode_info *ci)
 {
-       struct ceph_mount_options *ma = mdsc->fsc->mount_options;
+       struct ceph_mount_options *opt = mdsc->fsc->mount_options;
 
        ci->i_hold_caps_min = round_jiffies(jiffies +
-                                           ma->caps_wanted_delay_min * HZ);
+                                           opt->caps_wanted_delay_min * HZ);
        ci->i_hold_caps_max = round_jiffies(jiffies +
-                                           ma->caps_wanted_delay_max * HZ);
+                                           opt->caps_wanted_delay_max * HZ);
        dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
             ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
 }
index eba2835..a8f4298 100644 (file)
@@ -1224,6 +1224,7 @@ enum {
 
 struct ceph_lease_walk_control {
        bool dir_lease;
+       bool expire_dir_lease;
        unsigned long nr_to_scan;
        unsigned long dir_lease_ttl;
 };
@@ -1345,7 +1346,13 @@ static int __dir_lease_check(struct dentry *dentry, void *arg)
                /* Move dentry to tail of dir lease list if we don't want
                 * to delete it. So dentries in the list are checked in a
                 * round robin manner */
-               return TOUCH;
+               if (!lwc->expire_dir_lease)
+                       return TOUCH;
+               if (dentry->d_lockref.count > 0 ||
+                   (di->flags & CEPH_DENTRY_REFERENCED))
+                       return TOUCH;
+               /* invalidate dir lease */
+               di->lease_shared_gen = 0;
        }
        return DELETE;
 }
@@ -1353,8 +1360,17 @@ static int __dir_lease_check(struct dentry *dentry, void *arg)
 int ceph_trim_dentries(struct ceph_mds_client *mdsc)
 {
        struct ceph_lease_walk_control lwc;
+       unsigned long count;
        unsigned long freed;
 
+       spin_lock(&mdsc->caps_list_lock);
+        if (mdsc->caps_use_max > 0 &&
+            mdsc->caps_use_count > mdsc->caps_use_max)
+               count = mdsc->caps_use_count - mdsc->caps_use_max;
+       else
+               count = 0;
+        spin_unlock(&mdsc->caps_list_lock);
+
        lwc.dir_lease = false;
        lwc.nr_to_scan  = CEPH_CAPS_PER_RELEASE * 2;
        freed = __dentry_leases_walk(mdsc, &lwc, __dentry_lease_check);
@@ -1365,6 +1381,8 @@ int ceph_trim_dentries(struct ceph_mds_client *mdsc)
                lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE;
 
        lwc.dir_lease = true;
+       lwc.expire_dir_lease = freed < count;
+       lwc.dir_lease_ttl = mdsc->fsc->mount_options->caps_wanted_delay_max * HZ;
        freed +=__dentry_leases_walk(mdsc, &lwc, __dir_lease_check);
        if (!lwc.nr_to_scan) /* more to check */
                return -EAGAIN;
index 2095e5d..21c33ed 100644 (file)
@@ -1965,6 +1965,18 @@ void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
         }
 }
 
+void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
+{
+       int val;
+       if (!nr)
+               return;
+       val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
+       if (!(val % CEPH_CAPS_PER_RELEASE)) {
+               atomic_set(&mdsc->cap_reclaim_pending, 0);
+               ceph_queue_cap_reclaim_work(mdsc);
+       }
+}
+
 /*
  * requests
  */
@@ -2878,7 +2890,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
                                    req->r_op == CEPH_MDS_OP_LSSNAP))
                        ceph_readdir_prepopulate(req, req->r_session);
-               ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
        }
        current->journal_info = NULL;
        mutex_unlock(&req->r_fill_mutex);
@@ -2887,12 +2898,18 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        if (realm)
                ceph_put_snap_realm(mdsc, realm);
 
-       if (err == 0 && req->r_target_inode &&
-           test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
-               struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
-               spin_lock(&ci->i_unsafe_lock);
-               list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
-               spin_unlock(&ci->i_unsafe_lock);
+       if (err == 0) {
+               if (req->r_target_inode &&
+                   test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
+                       struct ceph_inode_info *ci =
+                               ceph_inode(req->r_target_inode);
+                       spin_lock(&ci->i_unsafe_lock);
+                       list_add_tail(&req->r_unsafe_target_item,
+                                     &ci->i_unsafe_iops);
+                       spin_unlock(&ci->i_unsafe_lock);
+               }
+
+               ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
        }
 out_err:
        mutex_lock(&mdsc->mutex);
@@ -4083,13 +4100,14 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        spin_lock_init(&mdsc->cap_dirty_lock);
        init_waitqueue_head(&mdsc->cap_flushing_wq);
        INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
+       atomic_set(&mdsc->cap_reclaim_pending, 0);
 
        spin_lock_init(&mdsc->dentry_list_lock);
        INIT_LIST_HEAD(&mdsc->dentry_leases);
        INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
 
        ceph_caps_init(mdsc);
-       ceph_adjust_min_caps(mdsc, fsc->min_caps);
+       ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
 
        spin_lock_init(&mdsc->snapid_map_lock);
        mdsc->snapid_map_tree = RB_ROOT;
index 580b235..50385a4 100644 (file)
@@ -379,6 +379,7 @@ struct ceph_mds_client {
        wait_queue_head_t cap_flushing_wq;
 
        struct work_struct cap_reclaim_work;
+       atomic_t           cap_reclaim_pending;
 
        /*
         * Cap reservations
@@ -396,6 +397,7 @@ struct ceph_mds_client {
                                                unreserved) */
        int             caps_total_count;    /* total caps allocated */
        int             caps_use_count;      /* in use */
+       int             caps_use_max;        /* max used caps */
        int             caps_reserve_count;  /* unused, reserved */
        int             caps_avail_count;    /* unused, unreserved */
        int             caps_min_count;      /* keep at least this many
@@ -465,6 +467,7 @@ extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
 extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
                                    struct ceph_mds_session *session);
 extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc);
+extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr);
 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
 
 extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
index 200836b..6d5bb2f 100644 (file)
@@ -133,6 +133,7 @@ enum {
        Opt_rasize,
        Opt_caps_wanted_delay_min,
        Opt_caps_wanted_delay_max,
+       Opt_caps_max,
        Opt_readdir_max_entries,
        Opt_readdir_max_bytes,
        Opt_congestion_kb,
@@ -175,6 +176,7 @@ static match_table_t fsopt_tokens = {
        {Opt_rasize, "rasize=%d"},
        {Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
        {Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
+       {Opt_caps_max, "caps_max=%d"},
        {Opt_readdir_max_entries, "readdir_max_entries=%d"},
        {Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
        {Opt_congestion_kb, "write_congestion_kb=%d"},
@@ -286,6 +288,11 @@ static int parse_fsopt_token(char *c, void *private)
                        return -EINVAL;
                fsopt->caps_wanted_delay_max = intval;
                break;
+       case Opt_caps_max:
+               if (intval < 0)
+                       return -EINVAL;
+               fsopt->caps_max = intval;
+               break;
        case Opt_readdir_max_entries:
                if (intval < 1)
                        return -EINVAL;
@@ -576,6 +583,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
                seq_printf(m, ",rasize=%d", fsopt->rasize);
        if (fsopt->congestion_kb != default_congestion_kb())
                seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
+       if (fsopt->caps_max)
+               seq_printf(m, ",caps_max=%d", fsopt->caps_max);
        if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
                seq_printf(m, ",caps_wanted_delay_min=%d",
                         fsopt->caps_wanted_delay_min);
@@ -683,9 +692,6 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
        if (!fsc->wb_pagevec_pool)
                goto fail_cap_wq;
 
-       /* caps */
-       fsc->min_caps = fsopt->max_readdir;
-
        return fsc;
 
 fail_cap_wq:
index b3bcfb3..16c0318 100644 (file)
@@ -79,6 +79,7 @@ struct ceph_mount_options {
        int rasize;           /* max readahead */
        int congestion_kb;    /* max writeback in flight */
        int caps_wanted_delay_min, caps_wanted_delay_max;
+       int caps_max;
        int max_readdir;       /* max readdir result (entires) */
        int max_readdir_bytes; /* max readdir result (bytes) */
 
@@ -100,7 +101,6 @@ struct ceph_fs_client {
        struct ceph_client *client;
 
        unsigned long mount_state;
-       int min_caps;                  /* min caps i added */
        loff_t max_file_size;
 
        struct ceph_mds_client *mdsc;
@@ -668,7 +668,8 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
 
 extern void ceph_caps_init(struct ceph_mds_client *mdsc);
 extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
-extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
+extern void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
+                                    struct ceph_mount_options *fsopt);
 extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
                             struct ceph_cap_reservation *ctx, int need);
 extern void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
index 27cd973..bd3d532 100644 (file)
@@ -24,6 +24,7 @@ struct ceph_vino {
 /* context for the caps reservation mechanism */
 struct ceph_cap_reservation {
        int count;
+       int used;
 };