Merge tag 'ceph-for-5.18-rc1' of https://github.com/ceph/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Fri, 25 Mar 2022 01:32:48 +0000 (18:32 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 25 Mar 2022 01:32:48 +0000 (18:32 -0700)
Pull ceph updates from Ilya Dryomov:
 "The highlights are:

   - several changes to how snap context and snap realms are tracked
     (Xiubo Li). In particular, this should resolve a long-standing
     issue of high kworker CPU usage and various stalls caused by
     needless iteration over all inodes in the snap realm.

   - async create fixes to address hangs in some edge cases (Jeff
     Layton)

   - support for getvxattr MDS op for querying server-side xattrs, such
     as file/directory layouts and ephemeral pins (Milind Changire)

   - average latency is now maintained for all metrics (Venky Shankar)

   - some tweaks around handling inline data to make it fit better with
     netfs helper library (David Howells)

  Also a couple of memory leaks got plugged along with a few assorted
  fixups. Last but not least, Xiubo has stepped up to serve as a CephFS
  co-maintainer"

* tag 'ceph-for-5.18-rc1' of https://github.com/ceph/ceph-client: (27 commits)
  ceph: fix memory leak in ceph_readdir when note_last_dentry returns error
  ceph: uninitialized variable in debug output
  ceph: use tracked average r/w/m latencies to display metrics in debugfs
  ceph: include average/stdev r/w/m latency in mds metrics
  ceph: track average r/w/m latency
  ceph: use ktime_to_timespec64() rather than jiffies_to_timespec64()
  ceph: assign the ci only when the inode isn't NULL
  ceph: fix inode reference leakage in ceph_get_snapdir()
  ceph: misc fix for code style and logs
  ceph: allocate capsnap memory outside of ceph_queue_cap_snap()
  ceph: do not release the global snaprealm until unmounting
  ceph: remove incorrect and unused CEPH_INO_DOTDOT macro
  MAINTAINERS: add Xiubo Li as cephfs co-maintainer
  ceph: eliminate the recursion when rebuilding the snap context
  ceph: do not update snapshot context when there is no new snapshot
  ceph: zero the dir_entries memory when allocating it
  ceph: move to a dedicated slabcache for ceph_cap_snap
  ceph: add getvxattr op
  libceph: drop else branches in prepare_read_data{,_cont}
  ceph: fix comments mentioning i_mutex
  ...

20 files changed:
MAINTAINERS
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/debugfs.c
fs/ceph/dir.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/locks.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/metric.c
fs/ceph/metric.h
fs/ceph/snap.c
fs/ceph/strings.c
fs/ceph/super.c
fs/ceph/super.h
fs/ceph/xattr.c
include/linux/ceph/ceph_fs.h
include/linux/ceph/libceph.h
net/ceph/messenger_v2.c

index e1c8399..5b4e73f 100644 (file)
@@ -4456,6 +4456,7 @@ F:        drivers/power/supply/cw2015_battery.c
 CEPH COMMON CODE (LIBCEPH)
 M:     Ilya Dryomov <idryomov@gmail.com>
 M:     Jeff Layton <jlayton@kernel.org>
+M:     Xiubo Li <xiubli@redhat.com>
 L:     ceph-devel@vger.kernel.org
 S:     Supported
 W:     http://ceph.com/
@@ -4466,6 +4467,7 @@ F:        net/ceph/
 
 CEPH DISTRIBUTED FILE SYSTEM CLIENT (CEPH)
 M:     Jeff Layton <jlayton@kernel.org>
+M:     Xiubo Li <xiubli@redhat.com>
 M:     Ilya Dryomov <idryomov@gmail.com>
 L:     ceph-devel@vger.kernel.org
 S:     Supported
index f6135c9..c7a0ab0 100644 (file)
@@ -184,7 +184,7 @@ static int ceph_releasepage(struct page *page, gfp_t gfp)
 
 static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq)
 {
-       struct inode *inode = rreq->mapping->host;
+       struct inode *inode = rreq->inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_file_layout *lo = &ci->i_layout;
        u32 blockoff;
@@ -201,7 +201,7 @@ static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq)
 
 static bool ceph_netfs_clamp_length(struct netfs_read_subrequest *subreq)
 {
-       struct inode *inode = subreq->rreq->mapping->host;
+       struct inode *inode = subreq->rreq->inode;
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 objno, objoff;
@@ -244,10 +244,63 @@ static void finish_netfs_read(struct ceph_osd_request *req)
        iput(req->r_inode);
 }
 
+static bool ceph_netfs_issue_op_inline(struct netfs_read_subrequest *subreq)
+{
+       struct netfs_read_request *rreq = subreq->rreq;
+       struct inode *inode = rreq->inode;
+       struct ceph_mds_reply_info_parsed *rinfo;
+       struct ceph_mds_reply_info_in *iinfo;
+       struct ceph_mds_request *req;
+       struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct iov_iter iter;
+       ssize_t err = 0;
+       size_t len;
+
+       __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
+       __clear_bit(NETFS_SREQ_WRITE_TO_CACHE, &subreq->flags);
+
+       if (subreq->start >= inode->i_size)
+               goto out;
+
+       /* We need to fetch the inline data. */
+       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
+       if (IS_ERR(req)) {
+               err = PTR_ERR(req);
+               goto out;
+       }
+       req->r_ino1 = ci->i_vino;
+       req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA);
+       req->r_num_caps = 2;
+
+       err = ceph_mdsc_do_request(mdsc, NULL, req);
+       if (err < 0)
+               goto out;
+
+       rinfo = &req->r_reply_info;
+       iinfo = &rinfo->targeti;
+       if (iinfo->inline_version == CEPH_INLINE_NONE) {
+               /* The data got uninlined */
+               ceph_mdsc_put_request(req);
+               return false;
+       }
+
+       len = min_t(size_t, iinfo->inline_len - subreq->start, subreq->len);
+       iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
+       err = copy_to_iter(iinfo->inline_data + subreq->start, len, &iter);
+       if (err == 0)
+               err = -EFAULT;
+
+       ceph_mdsc_put_request(req);
+out:
+       netfs_subreq_terminated(subreq, err, false);
+       return true;
+}
+
 static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
 {
        struct netfs_read_request *rreq = subreq->rreq;
-       struct inode *inode = rreq->mapping->host;
+       struct inode *inode = rreq->inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_osd_request *req;
@@ -258,6 +311,10 @@ static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
        int err = 0;
        u64 len = subreq->len;
 
+       if (ci->i_inline_version != CEPH_INLINE_NONE &&
+           ceph_netfs_issue_op_inline(subreq))
+               return;
+
        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
                        0, 1, CEPH_OSD_OP_READ,
                        CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
@@ -326,23 +383,9 @@ static int ceph_readpage(struct file *file, struct page *subpage)
        size_t len = folio_size(folio);
        u64 off = folio_file_pos(folio);
 
-       if (ci->i_inline_version != CEPH_INLINE_NONE) {
-               /*
-                * Uptodate inline data should have been added
-                * into page cache while getting Fcr caps.
-                */
-               if (off == 0) {
-                       folio_unlock(folio);
-                       return -EINVAL;
-               }
-               zero_user_segment(&folio->page, 0, folio_size(folio));
-               folio_mark_uptodate(folio);
-               folio_unlock(folio);
-               return 0;
-       }
-
-       dout("readpage ino %llx.%llx file %p off %llu len %zu folio %p index %lu\n",
-            vino.ino, vino.snap, file, off, len, folio, folio_index(folio));
+       dout("readpage ino %llx.%llx file %p off %llu len %zu folio %p index %lu\n inline %d",
+            vino.ino, vino.snap, file, off, len, folio, folio_index(folio),
+            ci->i_inline_version != CEPH_INLINE_NONE);
 
        return netfs_readpage(file, folio, &ceph_netfs_read_ops, NULL);
 }
@@ -1281,45 +1324,11 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
                            struct page **pagep, void **fsdata)
 {
        struct inode *inode = file_inode(file);
-       struct ceph_inode_info *ci = ceph_inode(inode);
        struct folio *folio = NULL;
-       pgoff_t index = pos >> PAGE_SHIFT;
        int r;
 
-       /*
-        * Uninlining should have already been done and everything updated, EXCEPT
-        * for inline_version sent to the MDS.
-        */
-       if (ci->i_inline_version != CEPH_INLINE_NONE) {
-               unsigned int fgp_flags = FGP_LOCK | FGP_WRITE | FGP_CREAT | FGP_STABLE;
-               if (aop_flags & AOP_FLAG_NOFS)
-                       fgp_flags |= FGP_NOFS;
-               folio = __filemap_get_folio(mapping, index, fgp_flags,
-                                           mapping_gfp_mask(mapping));
-               if (!folio)
-                       return -ENOMEM;
-
-               /*
-                * The inline_version on a new inode is set to 1. If that's the
-                * case, then the folio is brand new and isn't yet Uptodate.
-                */
-               r = 0;
-               if (index == 0 && ci->i_inline_version != 1) {
-                       if (!folio_test_uptodate(folio)) {
-                               WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n",
-                                         ci->i_inline_version);
-                               r = -EINVAL;
-                       }
-                       goto out;
-               }
-               zero_user_segment(&folio->page, 0, folio_size(folio));
-               folio_mark_uptodate(folio);
-               goto out;
-       }
-
        r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &folio, NULL,
                              &ceph_netfs_read_ops, NULL);
-out:
        if (r == 0)
                folio_wait_fscache(folio);
        if (r < 0) {
@@ -1515,19 +1524,6 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
        sb_start_pagefault(inode->i_sb);
        ceph_block_sigs(&oldset);
 
-       if (ci->i_inline_version != CEPH_INLINE_NONE) {
-               struct page *locked_page = NULL;
-               if (off == 0) {
-                       lock_page(page);
-                       locked_page = page;
-               }
-               err = ceph_uninline_data(vma->vm_file, locked_page);
-               if (locked_page)
-                       unlock_page(locked_page);
-               if (err < 0)
-                       goto out_free;
-       }
-
        if (off + thp_size(page) <= size)
                len = thp_size(page);
        else
@@ -1584,11 +1580,9 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
                ceph_put_snap_context(snapc);
        } while (err == 0);
 
-       if (ret == VM_FAULT_LOCKED ||
-           ci->i_inline_version != CEPH_INLINE_NONE) {
+       if (ret == VM_FAULT_LOCKED) {
                int dirty;
                spin_lock(&ci->i_ceph_lock);
-               ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
                                               &prealloc_cf);
                spin_unlock(&ci->i_ceph_lock);
@@ -1652,16 +1646,30 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
        }
 }
 
-int ceph_uninline_data(struct file *filp, struct page *locked_page)
+int ceph_uninline_data(struct file *file)
 {
-       struct inode *inode = file_inode(filp);
+       struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_osd_request *req;
-       struct page *page = NULL;
-       u64 len, inline_version;
+       struct ceph_cap_flush *prealloc_cf;
+       struct folio *folio = NULL;
+       u64 inline_version = CEPH_INLINE_NONE;
+       struct page *pages[1];
        int err = 0;
-       bool from_pagecache = false;
+       u64 len;
+
+       prealloc_cf = ceph_alloc_cap_flush();
+       if (!prealloc_cf)
+               return -ENOMEM;
+
+       folio = read_mapping_folio(inode->i_mapping, 0, file);
+       if (IS_ERR(folio)) {
+               err = PTR_ERR(folio);
+               goto out;
+       }
+
+       folio_lock(folio);
 
        spin_lock(&ci->i_ceph_lock);
        inline_version = ci->i_inline_version;
@@ -1672,45 +1680,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 
        if (inline_version == 1 || /* initial version, no data */
            inline_version == CEPH_INLINE_NONE)
-               goto out;
-
-       if (locked_page) {
-               page = locked_page;
-               WARN_ON(!PageUptodate(page));
-       } else if (ceph_caps_issued(ci) &
-                  (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
-               page = find_get_page(inode->i_mapping, 0);
-               if (page) {
-                       if (PageUptodate(page)) {
-                               from_pagecache = true;
-                               lock_page(page);
-                       } else {
-                               put_page(page);
-                               page = NULL;
-                       }
-               }
-       }
+               goto out_unlock;
 
-       if (page) {
-               len = i_size_read(inode);
-               if (len > PAGE_SIZE)
-                       len = PAGE_SIZE;
-       } else {
-               page = __page_cache_alloc(GFP_NOFS);
-               if (!page) {
-                       err = -ENOMEM;
-                       goto out;
-               }
-               err = __ceph_do_getattr(inode, page,
-                                       CEPH_STAT_CAP_INLINE_DATA, true);
-               if (err < 0) {
-                       /* no inline data */
-                       if (err == -ENODATA)
-                               err = 0;
-                       goto out;
-               }
-               len = err;
-       }
+       len = i_size_read(inode);
+       if (len > folio_size(folio))
+               len = folio_size(folio);
 
        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
                                    ceph_vino(inode), 0, &len, 0, 1,
@@ -1718,7 +1692,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
                                    NULL, 0, 0, false);
        if (IS_ERR(req)) {
                err = PTR_ERR(req);
-               goto out;
+               goto out_unlock;
        }
 
        req->r_mtime = inode->i_mtime;
@@ -1727,7 +1701,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
                err = ceph_osdc_wait_request(&fsc->client->osdc, req);
        ceph_osdc_put_request(req);
        if (err < 0)
-               goto out;
+               goto out_unlock;
 
        req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
                                    ceph_vino(inode), 0, &len, 1, 3,
@@ -1736,10 +1710,11 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
                                    ci->i_truncate_size, false);
        if (IS_ERR(req)) {
                err = PTR_ERR(req);
-               goto out;
+               goto out_unlock;
        }
 
-       osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
+       pages[0] = folio_page(folio, 0);
+       osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false);
 
        {
                __le64 xattr_buf = cpu_to_le64(inline_version);
@@ -1749,7 +1724,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
                                            CEPH_OSD_CMPXATTR_OP_GT,
                                            CEPH_OSD_CMPXATTR_MODE_U64);
                if (err)
-                       goto out_put;
+                       goto out_put_req;
        }
 
        {
@@ -1760,7 +1735,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
                                            "inline_version",
                                            xattr_buf, xattr_len, 0, 0);
                if (err)
-                       goto out_put;
+                       goto out_put_req;
        }
 
        req->r_mtime = inode->i_mtime;
@@ -1771,19 +1746,28 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
        ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
                                  req->r_end_latency, len, err);
 
-out_put:
+       if (!err) {
+               int dirty;
+
+               /* Set to CAP_INLINE_NONE and dirty the caps */
+               down_read(&fsc->mdsc->snap_rwsem);
+               spin_lock(&ci->i_ceph_lock);
+               ci->i_inline_version = CEPH_INLINE_NONE;
+               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+               spin_unlock(&ci->i_ceph_lock);
+               up_read(&fsc->mdsc->snap_rwsem);
+               if (dirty)
+                       __mark_inode_dirty(inode, dirty);
+       }
+out_put_req:
        ceph_osdc_put_request(req);
        if (err == -ECANCELED)
                err = 0;
+out_unlock:
+       folio_unlock(folio);
+       folio_put(folio);
 out:
-       if (page && page != locked_page) {
-               if (from_pagecache) {
-                       unlock_page(page);
-                       put_page(page);
-               } else
-                       __free_pages(page, 0);
-       }
-
+       ceph_free_cap_flush(prealloc_cf);
        dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
             inode, ceph_vinop(inode), inline_version, err);
        return err;
index b472cd0..f1ad688 100644 (file)
@@ -1915,6 +1915,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
                ceph_get_mds_session(session);
 
        spin_lock(&ci->i_ceph_lock);
+       if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
+               /* Don't send messages until we get async create reply */
+               spin_unlock(&ci->i_ceph_lock);
+               ceph_put_mds_session(session);
+               return;
+       }
+
        if (ci->i_ceph_flags & CEPH_I_FLUSH)
                flags |= CHECK_CAPS_FLUSH;
 retry:
@@ -2409,6 +2416,9 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
        dout("write_inode %p wait=%d\n", inode, wait);
        ceph_fscache_unpin_writeback(inode, wbc);
        if (wait) {
+               err = ceph_wait_on_async_create(inode);
+               if (err)
+                       return err;
                dirty = try_flush_caps(inode, &flush_tid);
                if (dirty)
                        err = wait_event_interruptible(ci->i_cap_wq,
@@ -2439,6 +2449,10 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
        u64 first_tid = 0;
        u64 last_snap_flush = 0;
 
+       /* Don't do anything until create reply comes in */
+       if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE)
+               return;
+
        ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
 
        list_for_each_entry_reverse(cf, &ci->i_cap_flush_list, i_list) {
@@ -4152,7 +4166,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
        /* lookup ino */
        inode = ceph_find_inode(mdsc->fsc->sb, vino);
-       ci = ceph_inode(inode);
        dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
             vino.snap, inode);
 
@@ -4178,6 +4191,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                }
                goto flush_cap_releases;
        }
+       ci = ceph_inode(inode);
 
        /* these will work even if we don't have a cap yet */
        switch (op) {
index 3cf7c9c..bec3c45 100644 (file)
@@ -175,7 +175,7 @@ static int metrics_latency_show(struct seq_file *s, void *p)
        struct ceph_fs_client *fsc = s->private;
        struct ceph_client_metric *cm = &fsc->mdsc->metric;
        struct ceph_metric *m;
-       s64 total, sum, avg, min, max, sq;
+       s64 total, avg, min, max, sq;
        int i;
 
        seq_printf(s, "item          total       avg_lat(us)     min_lat(us)     max_lat(us)     stdev(us)\n");
@@ -185,8 +185,7 @@ static int metrics_latency_show(struct seq_file *s, void *p)
                m = &cm->metric[i];
                spin_lock(&m->lock);
                total = m->total;
-               sum = m->latency_sum;
-               avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
+               avg = m->latency_avg;
                min = m->latency_min;
                max = m->latency_max;
                sq = m->latency_sq_sum;
index 133dbd9..eae417d 100644 (file)
@@ -145,7 +145,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx,
                        return ERR_PTR(-EAGAIN);
                }
                /* reading/filling the cache are serialized by
-                  i_mutex, no need to use page lock */
+                  i_rwsem, no need to use page lock */
                unlock_page(cache_ctl->page);
                cache_ctl->dentries = kmap(cache_ctl->page);
        }
@@ -155,7 +155,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx,
        rcu_read_lock();
        spin_lock(&parent->d_lock);
        /* check i_size again here, because empty directory can be
-        * marked as complete while not holding the i_mutex. */
+        * marked as complete while not holding the i_rwsem. */
        if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir))
                dentry = cache_ctl->dentries[cache_ctl->index];
        else
@@ -478,8 +478,11 @@ more:
                                        2 : (fpos_off(rde->offset) + 1);
                        err = note_last_dentry(dfi, rde->name, rde->name_len,
                                               next_offset);
-                       if (err)
+                       if (err) {
+                               ceph_mdsc_put_request(dfi->last_readdir);
+                               dfi->last_readdir = NULL;
                                return err;
+                       }
                } else if (req->r_reply_info.dir_end) {
                        dfi->next_offset = 2;
                        /* keep last name */
@@ -520,6 +523,12 @@ more:
                if (!dir_emit(ctx, rde->name, rde->name_len,
                              ceph_present_ino(inode->i_sb, le64_to_cpu(rde->inode.in->ino)),
                              le32_to_cpu(rde->inode.in->mode) >> 12)) {
+                       /*
+                        * NOTE: Here no need to put the 'dfi->last_readdir',
+                        * because when dir_emit stops us it's most likely
+                        * doesn't have enough memory, etc. So for next readdir
+                        * it will continue.
+                        */
                        dout("filldir stopping us...\n");
                        return 0;
                }
@@ -671,7 +680,7 @@ struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req,
                                   struct dentry *dentry)
 {
        struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
-       struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */
+       struct inode *parent = d_inode(dentry->d_parent); /* we hold i_rwsem */
 
        /* .snap dir? */
        if (ceph_snap(parent) == CEPH_NOSNAP &&
index bbed322..feb75eb 100644 (file)
@@ -207,6 +207,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
        struct ceph_mount_options *opt =
                ceph_inode_to_client(&ci->vfs_inode)->mount_options;
        struct ceph_file_info *fi;
+       int ret;
 
        dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
                        inode->i_mode, isdir ? "dir" : "regular");
@@ -240,7 +241,22 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
        INIT_LIST_HEAD(&fi->rw_contexts);
        fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
 
+       if ((file->f_mode & FMODE_WRITE) &&
+           ci->i_inline_version != CEPH_INLINE_NONE) {
+               ret = ceph_uninline_data(file);
+               if (ret < 0)
+                       goto error;
+       }
+
        return 0;
+
+error:
+       ceph_fscache_unuse_cookie(inode, file->f_mode & FMODE_WRITE);
+       ceph_put_fmode(ci, fi->fmode, 1);
+       kmem_cache_free(ceph_file_cachep, fi);
+       /* wake up anyone waiting for caps on this inode */
+       wake_up_all(&ci->i_cap_wq);
+       return ret;
 }
 
 /*
@@ -516,52 +532,67 @@ static void restore_deleg_ino(struct inode *dir, u64 ino)
        }
 }
 
+static void wake_async_create_waiters(struct inode *inode,
+                                     struct ceph_mds_session *session)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+
+       spin_lock(&ci->i_ceph_lock);
+       if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
+               ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE;
+               wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT);
+       }
+       ceph_kick_flushing_inode_caps(session, ci);
+       spin_unlock(&ci->i_ceph_lock);
+}
+
 static void ceph_async_create_cb(struct ceph_mds_client *mdsc,
                                  struct ceph_mds_request *req)
 {
+       struct dentry *dentry = req->r_dentry;
+       struct inode *dinode = d_inode(dentry);
+       struct inode *tinode = req->r_target_inode;
        int result = req->r_err ? req->r_err :
                        le32_to_cpu(req->r_reply_info.head->result);
 
+       WARN_ON_ONCE(dinode && tinode && dinode != tinode);
+
+       /* MDS changed -- caller must resubmit */
        if (result == -EJUKEBOX)
                goto out;
 
        mapping_set_error(req->r_parent->i_mapping, result);
 
        if (result) {
-               struct dentry *dentry = req->r_dentry;
-               struct inode *inode = d_inode(dentry);
                int pathlen = 0;
                u64 base = 0;
                char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
                                                  &base, 0);
 
+               pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
+                       base, IS_ERR(path) ? "<<bad>>" : path, result);
+               ceph_mdsc_free_path(path, pathlen);
+
                ceph_dir_clear_complete(req->r_parent);
                if (!d_unhashed(dentry))
                        d_drop(dentry);
 
-               ceph_inode_shutdown(inode);
-
-               pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
-                       base, IS_ERR(path) ? "<<bad>>" : path, result);
-               ceph_mdsc_free_path(path, pathlen);
+               if (dinode) {
+                       mapping_set_error(dinode->i_mapping, result);
+                       ceph_inode_shutdown(dinode);
+                       wake_async_create_waiters(dinode, req->r_session);
+               }
        }
 
-       if (req->r_target_inode) {
-               struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
-               u64 ino = ceph_vino(req->r_target_inode).ino;
+       if (tinode) {
+               u64 ino = ceph_vino(tinode).ino;
 
                if (req->r_deleg_ino != ino)
                        pr_warn("%s: inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n",
                                __func__, req->r_err, req->r_deleg_ino, ino);
-               mapping_set_error(req->r_target_inode->i_mapping, result);
 
-               spin_lock(&ci->i_ceph_lock);
-               if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
-                       ci->i_ceph_flags &= ~CEPH_I_ASYNC_CREATE;
-                       wake_up_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT);
-               }
-               ceph_kick_flushing_inode_caps(req->r_session, ci);
-               spin_unlock(&ci->i_ceph_lock);
+               mapping_set_error(tinode->i_mapping, result);
+               wake_async_create_waiters(tinode, req->r_session);
        } else if (!result) {
                pr_warn("%s: no req->r_target_inode for 0x%llx\n", __func__,
                        req->r_deleg_ino);
@@ -1041,7 +1072,6 @@ static void ceph_aio_complete(struct inode *inode,
                }
 
                spin_lock(&ci->i_ceph_lock);
-               ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
                                               &aio_req->prealloc_cf);
                spin_unlock(&ci->i_ceph_lock);
@@ -1778,12 +1808,6 @@ retry_snap:
        if (err)
                goto out;
 
-       if (ci->i_inline_version != CEPH_INLINE_NONE) {
-               err = ceph_uninline_data(file, NULL);
-               if (err < 0)
-                       goto out;
-       }
-
        dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
             inode, ceph_vinop(inode), pos, count, i_size_read(inode));
        if (!(fi->flags & CEPH_F_SYNC) && !direct_lock)
@@ -1855,7 +1879,6 @@ retry_snap:
                int dirty;
 
                spin_lock(&ci->i_ceph_lock);
-               ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
                                               &prealloc_cf);
                spin_unlock(&ci->i_ceph_lock);
@@ -2109,12 +2132,6 @@ static long ceph_fallocate(struct file *file, int mode,
                goto unlock;
        }
 
-       if (ci->i_inline_version != CEPH_INLINE_NONE) {
-               ret = ceph_uninline_data(file, NULL);
-               if (ret < 0)
-                       goto unlock;
-       }
-
        size = i_size_read(inode);
 
        /* Are we punching a hole beyond EOF? */
@@ -2139,7 +2156,6 @@ static long ceph_fallocate(struct file *file, int mode,
 
        if (!ret) {
                spin_lock(&ci->i_ceph_lock);
-               ci->i_inline_version = CEPH_INLINE_NONE;
                dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
                                               &prealloc_cf);
                spin_unlock(&ci->i_ceph_lock);
@@ -2532,7 +2548,6 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
        }
        /* Mark Fw dirty */
        spin_lock(&dst_ci->i_ceph_lock);
-       dst_ci->i_inline_version = CEPH_INLINE_NONE;
        dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
        spin_unlock(&dst_ci->i_ceph_lock);
        if (dirty)
index 9cfa6c7..d80911d 100644 (file)
@@ -87,13 +87,13 @@ struct inode *ceph_get_snapdir(struct inode *parent)
        if (!S_ISDIR(parent->i_mode)) {
                pr_warn_once("bad snapdir parent type (mode=0%o)\n",
                             parent->i_mode);
-               return ERR_PTR(-ENOTDIR);
+               goto err;
        }
 
        if (!(inode->i_state & I_NEW) && !S_ISDIR(inode->i_mode)) {
                pr_warn_once("bad snapdir inode type (mode=0%o)\n",
                             inode->i_mode);
-               return ERR_PTR(-ENOTDIR);
+               goto err;
        }
 
        inode->i_mode = parent->i_mode;
@@ -113,6 +113,12 @@ struct inode *ceph_get_snapdir(struct inode *parent)
        }
 
        return inode;
+err:
+       if ((inode->i_state & I_NEW))
+               discard_new_inode(inode);
+       else
+               iput(inode);
+       return ERR_PTR(-ENOTDIR);
 }
 
 const struct inode_operations ceph_file_iops = {
@@ -1201,7 +1207,7 @@ out_unlock:
 
 /*
  * splice a dentry to an inode.
- * caller must hold directory i_mutex for this to be safe.
+ * caller must hold directory i_rwsem for this to be safe.
  */
 static int splice_dentry(struct dentry **pdn, struct inode *in)
 {
@@ -1598,7 +1604,7 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
                        return idx == 0 ? -ENOMEM : 0;
                }
                /* reading/filling the cache are serialized by
-                * i_mutex, no need to use page lock */
+                * i_rwsem, no need to use page lock */
                unlock_page(ctl->page);
                ctl->dentries = kmap(ctl->page);
                if (idx == 0)
@@ -2301,6 +2307,57 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
        return err;
 }
 
+int ceph_do_getvxattr(struct inode *inode, const char *name, void *value,
+                     size_t size)
+{
+       struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
+       struct ceph_mds_request *req;
+       int mode = USE_AUTH_MDS;
+       int err;
+       char *xattr_value;
+       size_t xattr_value_len;
+
+       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETVXATTR, mode);
+       if (IS_ERR(req)) {
+               err = -ENOMEM;
+               goto out;
+       }
+
+       req->r_path2 = kstrdup(name, GFP_NOFS);
+       if (!req->r_path2) {
+               err = -ENOMEM;
+               goto put;
+       }
+
+       ihold(inode);
+       req->r_inode = inode;
+       err = ceph_mdsc_do_request(mdsc, NULL, req);
+       if (err < 0)
+               goto put;
+
+       xattr_value = req->r_reply_info.xattr_info.xattr_value;
+       xattr_value_len = req->r_reply_info.xattr_info.xattr_value_len;
+
+       dout("do_getvxattr xattr_value_len:%zu, size:%zu\n", xattr_value_len, size);
+
+       err = (int)xattr_value_len;
+       if (size == 0)
+               goto put;
+
+       if (xattr_value_len > size) {
+               err = -ERANGE;
+               goto put;
+       }
+
+       memcpy(value, xattr_value, xattr_value_len);
+put:
+       ceph_mdsc_put_request(req);
+out:
+       dout("do_getvxattr result=%d\n", err);
+       return err;
+}
+
 
 /*
  * Check inode permissions.  We verify we have a valid value for
index d1f154a..3e2843e 100644 (file)
@@ -111,10 +111,10 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
        req->r_args.filelock_change.length = cpu_to_le64(length);
        req->r_args.filelock_change.wait = wait;
 
-       if (wait)
-               req->r_wait_for_completion = ceph_lock_wait_for_completion;
-
-       err = ceph_mdsc_do_request(mdsc, inode, req);
+       err = ceph_mdsc_submit_request(mdsc, inode, req);
+       if (!err)
+               err = ceph_mdsc_wait_request(mdsc, req, wait ?
+                                       ceph_lock_wait_for_completion : NULL);
        if (!err && operation == CEPH_MDS_OP_GETFILELOCK) {
                fl->fl_pid = -le64_to_cpu(req->r_reply_info.filelock_reply->pid);
                if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type)
index c30eefc..fa38c01 100644 (file)
@@ -555,6 +555,28 @@ bad:
        return -EIO;
 }
 
+static int parse_reply_info_getvxattr(void **p, void *end,
+                                     struct ceph_mds_reply_info_parsed *info,
+                                     u64 features)
+{
+       u32 value_len;
+
+       ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
+       ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
+       ceph_decode_skip_32(p, end, bad); /* skip payload length */
+
+       ceph_decode_32_safe(p, end, value_len, bad);
+
+       if (value_len == end - *p) {
+         info->xattr_info.xattr_value = *p;
+         info->xattr_info.xattr_value_len = value_len;
+         *p = end;
+         return value_len;
+       }
+bad:
+       return -EIO;
+}
+
 /*
  * parse extra results
  */
@@ -570,6 +592,8 @@ static int parse_reply_info_extra(void **p, void *end,
                return parse_reply_info_readdir(p, end, info, features);
        else if (op == CEPH_MDS_OP_CREATE)
                return parse_reply_info_create(p, end, info, features, s);
+       else if (op == CEPH_MDS_OP_GETVXATTR)
+               return parse_reply_info_getvxattr(p, end, info, features);
        else
                return -EIO;
 }
@@ -2178,7 +2202,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
        order = get_order(size * num_entries);
        while (order >= 0) {
                rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
-                                                            __GFP_NOWARN,
+                                                            __GFP_NOWARN |
+                                                            __GFP_ZERO,
                                                             order);
                if (rinfo->dir_entries)
                        break;
@@ -2946,15 +2971,16 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
        return err;
 }
 
-static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
-                                 struct ceph_mds_request *req)
+int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
+                          struct ceph_mds_request *req,
+                          ceph_mds_request_wait_callback_t wait_func)
 {
        int err;
 
        /* wait */
        dout("do_request waiting\n");
-       if (!req->r_timeout && req->r_wait_for_completion) {
-               err = req->r_wait_for_completion(mdsc, req);
+       if (wait_func) {
+               err = wait_func(mdsc, req);
        } else {
                long timeleft = wait_for_completion_killable_timeout(
                                        &req->r_completion,
@@ -3011,7 +3037,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
        /* issue */
        err = ceph_mdsc_submit_request(mdsc, dir, req);
        if (!err)
-               err = ceph_mdsc_wait_request(mdsc, req);
+               err = ceph_mdsc_wait_request(mdsc, req, NULL);
        dout("do_request %p done, result %d\n", req, err);
        return err;
 }
@@ -3097,35 +3123,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 
        result = le32_to_cpu(head->result);
 
-       /*
-        * Handle an ESTALE
-        * if we're not talking to the authority, send to them
-        * if the authority has changed while we weren't looking,
-        * send to new authority
-        * Otherwise we just have to return an ESTALE
-        */
-       if (result == -ESTALE) {
-               dout("got ESTALE on request %llu\n", req->r_tid);
-               req->r_resend_mds = -1;
-               if (req->r_direct_mode != USE_AUTH_MDS) {
-                       dout("not using auth, setting for that now\n");
-                       req->r_direct_mode = USE_AUTH_MDS;
-                       __do_request(mdsc, req);
-                       mutex_unlock(&mdsc->mutex);
-                       goto out;
-               } else  {
-                       int mds = __choose_mds(mdsc, req, NULL);
-                       if (mds >= 0 && mds != req->r_session->s_mds) {
-                               dout("but auth changed, so resending\n");
-                               __do_request(mdsc, req);
-                               mutex_unlock(&mdsc->mutex);
-                               goto out;
-                       }
-               }
-               dout("have to return ESTALE on request %llu\n", req->r_tid);
-       }
-
-
        if (head->safe) {
                set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
                __unregister_request(mdsc, req);
@@ -4841,7 +4838,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
        mutex_unlock(&mdsc->mutex);
 
        ceph_cleanup_snapid_map(mdsc);
-       ceph_cleanup_empty_realms(mdsc);
+       ceph_cleanup_global_and_empty_realms(mdsc);
 
        cancel_work_sync(&mdsc->cap_reclaim_work);
        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
index 97c7f7b..3349784 100644 (file)
@@ -100,6 +100,11 @@ struct ceph_mds_reply_dir_entry {
        loff_t                        offset;
 };
 
+struct ceph_mds_reply_xattr {
+       char *xattr_value;
+       size_t xattr_value_len;
+};
+
 /*
  * parsed info about an mds reply, including information about
  * either: 1) the target inode and/or its parent directory and dentry,
@@ -115,6 +120,7 @@ struct ceph_mds_reply_info_parsed {
        char                          *dname;
        u32                           dname_len;
        struct ceph_mds_reply_lease   *dlease;
+       struct ceph_mds_reply_xattr   xattr_info;
 
        /* extra */
        union {
@@ -274,8 +280,8 @@ struct ceph_mds_request {
 
        union ceph_mds_request_args r_args;
        int r_fmode;        /* file mode, if expecting cap */
-       const struct cred *r_cred;
        int r_request_release_offset;
+       const struct cred *r_cred;
        struct timespec64 r_stamp;
 
        /* for choosing which mds to send this request to */
@@ -296,12 +302,11 @@ struct ceph_mds_request {
        struct ceph_msg  *r_reply;
        struct ceph_mds_reply_info_parsed r_reply_info;
        int r_err;
-
+       u32               r_readdir_offset;
 
        struct page *r_locked_page;
        int r_dir_caps;
        int r_num_caps;
-       u32               r_readdir_offset;
 
        unsigned long r_timeout;  /* optional.  jiffies, 0 is "wait forever" */
        unsigned long r_started;  /* start time to measure timeout against */
@@ -329,7 +334,6 @@ struct ceph_mds_request {
        struct completion r_completion;
        struct completion r_safe_completion;
        ceph_mds_request_callback_t r_callback;
-       ceph_mds_request_wait_callback_t r_wait_for_completion;
        struct list_head  r_unsafe_item;  /* per-session unsafe list item */
 
        long long         r_dir_release_cnt;
@@ -507,6 +511,9 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode);
 extern int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
                                    struct inode *dir,
                                    struct ceph_mds_request *req);
+int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
+                       struct ceph_mds_request *req,
+                       ceph_mds_request_wait_callback_t wait_func);
 extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
                                struct inode *dir,
                                struct ceph_mds_request *req);
index 0fcba68..c47347d 100644 (file)
@@ -8,6 +8,12 @@
 #include "metric.h"
 #include "mds_client.h"
 
+static void ktime_to_ceph_timespec(struct ceph_timespec *ts, ktime_t val)
+{
+       struct timespec64 t = ktime_to_timespec64(val);
+       ceph_encode_timespec64(ts, &t);
+}
+
 static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
                                   struct ceph_mds_session *s)
 {
@@ -26,7 +32,6 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
        u64 nr_caps = atomic64_read(&m->total_caps);
        u32 header_len = sizeof(struct ceph_metric_header);
        struct ceph_msg *msg;
-       struct timespec64 ts;
        s64 sum;
        s32 items = 0;
        s32 len;
@@ -59,37 +64,40 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
        /* encode the read latency metric */
        read = (struct ceph_metric_read_latency *)(cap + 1);
        read->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY);
-       read->header.ver = 1;
+       read->header.ver = 2;
        read->header.compat = 1;
        read->header.data_len = cpu_to_le32(sizeof(*read) - header_len);
        sum = m->metric[METRIC_READ].latency_sum;
-       jiffies_to_timespec64(sum, &ts);
-       read->sec = cpu_to_le32(ts.tv_sec);
-       read->nsec = cpu_to_le32(ts.tv_nsec);
+       ktime_to_ceph_timespec(&read->lat, sum);
+       ktime_to_ceph_timespec(&read->avg, m->metric[METRIC_READ].latency_avg);
+       read->sq_sum = cpu_to_le64(m->metric[METRIC_READ].latency_sq_sum);
+       read->count = cpu_to_le64(m->metric[METRIC_READ].total);
        items++;
 
        /* encode the write latency metric */
        write = (struct ceph_metric_write_latency *)(read + 1);
        write->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY);
-       write->header.ver = 1;
+       write->header.ver = 2;
        write->header.compat = 1;
        write->header.data_len = cpu_to_le32(sizeof(*write) - header_len);
        sum = m->metric[METRIC_WRITE].latency_sum;
-       jiffies_to_timespec64(sum, &ts);
-       write->sec = cpu_to_le32(ts.tv_sec);
-       write->nsec = cpu_to_le32(ts.tv_nsec);
+       ktime_to_ceph_timespec(&write->lat, sum);
+       ktime_to_ceph_timespec(&write->avg, m->metric[METRIC_WRITE].latency_avg);
+       write->sq_sum = cpu_to_le64(m->metric[METRIC_WRITE].latency_sq_sum);
+       write->count = cpu_to_le64(m->metric[METRIC_WRITE].total);
        items++;
 
        /* encode the metadata latency metric */
        meta = (struct ceph_metric_metadata_latency *)(write + 1);
        meta->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY);
-       meta->header.ver = 1;
+       meta->header.ver = 2;
        meta->header.compat = 1;
        meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len);
        sum = m->metric[METRIC_METADATA].latency_sum;
-       jiffies_to_timespec64(sum, &ts);
-       meta->sec = cpu_to_le32(ts.tv_sec);
-       meta->nsec = cpu_to_le32(ts.tv_nsec);
+       ktime_to_ceph_timespec(&meta->lat, sum);
+       ktime_to_ceph_timespec(&meta->avg, m->metric[METRIC_METADATA].latency_avg);
+       meta->sq_sum = cpu_to_le64(m->metric[METRIC_METADATA].latency_sq_sum);
+       meta->count = cpu_to_le64(m->metric[METRIC_METADATA].total);
        items++;
 
        /* encode the dentry lease metric */
@@ -250,6 +258,7 @@ int ceph_metric_init(struct ceph_client_metric *m)
                metric->size_max = 0;
                metric->total = 0;
                metric->latency_sum = 0;
+               metric->latency_avg = 0;
                metric->latency_sq_sum = 0;
                metric->latency_min = KTIME_MAX;
                metric->latency_max = 0;
@@ -307,20 +316,19 @@ void ceph_metric_destroy(struct ceph_client_metric *m)
                max = new;                      \
 }
 
-static inline void __update_stdev(ktime_t total, ktime_t lsum,
-                                 ktime_t *sq_sump, ktime_t lat)
+static inline void __update_mean_and_stdev(ktime_t total, ktime_t *lavg,
+                                          ktime_t *sq_sump, ktime_t lat)
 {
-       ktime_t avg, sq;
-
-       if (unlikely(total == 1))
-               return;
-
-       /* the sq is (lat - old_avg) * (lat - new_avg) */
-       avg = DIV64_U64_ROUND_CLOSEST((lsum - lat), (total - 1));
-       sq = lat - avg;
-       avg = DIV64_U64_ROUND_CLOSEST(lsum, total);
-       sq = sq * (lat - avg);
-       *sq_sump += sq;
+       ktime_t avg;
+
+       if (unlikely(total == 1)) {
+               *lavg = lat;
+       } else {
+               /* the sq is (lat - old_avg) * (lat - new_avg) */
+               avg = *lavg + div64_s64(lat - *lavg, total);
+               *sq_sump += (lat - *lavg)*(lat - avg);
+               *lavg = avg;
+       }
 }
 
 void ceph_update_metrics(struct ceph_metric *m,
@@ -339,6 +347,7 @@ void ceph_update_metrics(struct ceph_metric *m,
        METRIC_UPDATE_MIN_MAX(m->size_min, m->size_max, size);
        m->latency_sum += lat;
        METRIC_UPDATE_MIN_MAX(m->latency_min, m->latency_max, lat);
-       __update_stdev(total, m->latency_sum, &m->latency_sq_sum, lat);
+       __update_mean_and_stdev(total, &m->latency_avg, &m->latency_sq_sum,
+                               lat);
        spin_unlock(&m->lock);
 }
index bb45608..0d0c44b 100644 (file)
@@ -2,7 +2,7 @@
 #ifndef _FS_CEPH_MDS_METRIC_H
 #define _FS_CEPH_MDS_METRIC_H
 
-#include <linux/types.h>
+#include <linux/ceph/types.h>
 #include <linux/percpu_counter.h>
 #include <linux/ktime.h>
 
@@ -19,27 +19,39 @@ enum ceph_metric_type {
        CLIENT_METRIC_TYPE_OPENED_INODES,
        CLIENT_METRIC_TYPE_READ_IO_SIZES,
        CLIENT_METRIC_TYPE_WRITE_IO_SIZES,
-
-       CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_WRITE_IO_SIZES,
+       CLIENT_METRIC_TYPE_AVG_READ_LATENCY,
+       CLIENT_METRIC_TYPE_STDEV_READ_LATENCY,
+       CLIENT_METRIC_TYPE_AVG_WRITE_LATENCY,
+       CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY,
+       CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY,
+       CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY,
+
+       CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY,
 };
 
 /*
  * This will always have the highest metric bit value
  * as the last element of the array.
  */
-#define CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED {  \
-       CLIENT_METRIC_TYPE_CAP_INFO,            \
-       CLIENT_METRIC_TYPE_READ_LATENCY,        \
-       CLIENT_METRIC_TYPE_WRITE_LATENCY,       \
-       CLIENT_METRIC_TYPE_METADATA_LATENCY,    \
-       CLIENT_METRIC_TYPE_DENTRY_LEASE,        \
-       CLIENT_METRIC_TYPE_OPENED_FILES,        \
-       CLIENT_METRIC_TYPE_PINNED_ICAPS,        \
-       CLIENT_METRIC_TYPE_OPENED_INODES,       \
-       CLIENT_METRIC_TYPE_READ_IO_SIZES,       \
-       CLIENT_METRIC_TYPE_WRITE_IO_SIZES,      \
-                                               \
-       CLIENT_METRIC_TYPE_MAX,                 \
+#define CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED {     \
+       CLIENT_METRIC_TYPE_CAP_INFO,               \
+       CLIENT_METRIC_TYPE_READ_LATENCY,           \
+       CLIENT_METRIC_TYPE_WRITE_LATENCY,          \
+       CLIENT_METRIC_TYPE_METADATA_LATENCY,       \
+       CLIENT_METRIC_TYPE_DENTRY_LEASE,           \
+       CLIENT_METRIC_TYPE_OPENED_FILES,           \
+       CLIENT_METRIC_TYPE_PINNED_ICAPS,           \
+       CLIENT_METRIC_TYPE_OPENED_INODES,          \
+       CLIENT_METRIC_TYPE_READ_IO_SIZES,          \
+       CLIENT_METRIC_TYPE_WRITE_IO_SIZES,         \
+       CLIENT_METRIC_TYPE_AVG_READ_LATENCY,       \
+       CLIENT_METRIC_TYPE_STDEV_READ_LATENCY,     \
+       CLIENT_METRIC_TYPE_AVG_WRITE_LATENCY,      \
+       CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY,    \
+       CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY,   \
+       CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \
+                                                  \
+       CLIENT_METRIC_TYPE_MAX,                    \
 }
 
 struct ceph_metric_header {
@@ -60,22 +72,28 @@ struct ceph_metric_cap {
 /* metric read latency header */
 struct ceph_metric_read_latency {
        struct ceph_metric_header header;
-       __le32 sec;
-       __le32 nsec;
+       struct ceph_timespec lat;
+       struct ceph_timespec avg;
+       __le64 sq_sum;
+       __le64 count;
 } __packed;
 
 /* metric write latency header */
 struct ceph_metric_write_latency {
        struct ceph_metric_header header;
-       __le32 sec;
-       __le32 nsec;
+       struct ceph_timespec lat;
+       struct ceph_timespec avg;
+       __le64 sq_sum;
+       __le64 count;
 } __packed;
 
 /* metric metadata latency header */
 struct ceph_metric_metadata_latency {
        struct ceph_metric_header header;
-       __le32 sec;
-       __le32 nsec;
+       struct ceph_timespec lat;
+       struct ceph_timespec avg;
+       __le64 sq_sum;
+       __le64 count;
 } __packed;
 
 /* metric dentry lease header */
@@ -140,6 +158,7 @@ struct ceph_metric {
        u64 size_min;
        u64 size_max;
        ktime_t latency_sum;
+       ktime_t latency_avg;
        ktime_t latency_sq_sum;
        ktime_t latency_min;
        ktime_t latency_max;
index b41e672..322ee5a 100644 (file)
@@ -121,18 +121,23 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
        if (!realm)
                return ERR_PTR(-ENOMEM);
 
-       atomic_set(&realm->nref, 1);    /* for caller */
+       /* Do not release the global dummy snaprealm until unmouting */
+       if (ino == CEPH_INO_GLOBAL_SNAPREALM)
+               atomic_set(&realm->nref, 2);
+       else
+               atomic_set(&realm->nref, 1);
        realm->ino = ino;
        INIT_LIST_HEAD(&realm->children);
        INIT_LIST_HEAD(&realm->child_item);
        INIT_LIST_HEAD(&realm->empty_item);
        INIT_LIST_HEAD(&realm->dirty_item);
+       INIT_LIST_HEAD(&realm->rebuild_item);
        INIT_LIST_HEAD(&realm->inodes_with_caps);
        spin_lock_init(&realm->inodes_with_caps_lock);
        __insert_snap_realm(&mdsc->snap_realms, realm);
        mdsc->num_snap_realms++;
 
-       dout("create_snap_realm %llx %p\n", realm->ino, realm);
+       dout("%s %llx %p\n", __func__, realm->ino, realm);
        return realm;
 }
 
@@ -156,7 +161,7 @@ static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
                else if (ino > r->ino)
                        n = n->rb_right;
                else {
-                       dout("lookup_snap_realm %llx %p\n", r->ino, r);
+                       dout("%s %llx %p\n", __func__, r->ino, r);
                        return r;
                }
        }
@@ -184,7 +189,7 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
 {
        lockdep_assert_held_write(&mdsc->snap_rwsem);
 
-       dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
+       dout("%s %p %llx\n", __func__, realm, realm->ino);
 
        rb_erase(&realm->node, &mdsc->snap_realms);
        mdsc->num_snap_realms--;
@@ -260,9 +265,14 @@ static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
        spin_unlock(&mdsc->snap_empty_lock);
 }
 
-void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc)
+void ceph_cleanup_global_and_empty_realms(struct ceph_mds_client *mdsc)
 {
+       struct ceph_snap_realm *global_realm;
+
        down_write(&mdsc->snap_rwsem);
+       global_realm = __lookup_snap_realm(mdsc, CEPH_INO_GLOBAL_SNAPREALM);
+       if (global_realm)
+               ceph_put_snap_realm(mdsc, global_realm);
        __cleanup_empty_realms(mdsc);
        up_write(&mdsc->snap_rwsem);
 }
@@ -292,9 +302,8 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
                if (IS_ERR(parent))
                        return PTR_ERR(parent);
        }
-       dout("adjust_snap_realm_parent %llx %p: %llx %p -> %llx %p\n",
-            realm->ino, realm, realm->parent_ino, realm->parent,
-            parentino, parent);
+       dout("%s %llx %p: %llx %p -> %llx %p\n", __func__, realm->ino,
+            realm, realm->parent_ino, realm->parent, parentino, parent);
        if (realm->parent) {
                list_del_init(&realm->child_item);
                ceph_put_snap_realm(mdsc, realm->parent);
@@ -320,7 +329,8 @@ static int cmpu64_rev(const void *a, const void *b)
  * build the snap context for a given realm.
  */
 static int build_snap_context(struct ceph_snap_realm *realm,
-                             struct list_head* dirty_realms)
+                             struct list_head *realm_queue,
+                             struct list_head *dirty_realms)
 {
        struct ceph_snap_realm *parent = realm->parent;
        struct ceph_snap_context *snapc;
@@ -334,9 +344,9 @@ static int build_snap_context(struct ceph_snap_realm *realm,
         */
        if (parent) {
                if (!parent->cached_context) {
-                       err = build_snap_context(parent, dirty_realms);
-                       if (err)
-                               goto fail;
+                       /* add to the queue head */
+                       list_add(&parent->rebuild_item, realm_queue);
+                       return 1;
                }
                num += parent->cached_context->num_snaps;
        }
@@ -349,9 +359,8 @@ static int build_snap_context(struct ceph_snap_realm *realm,
            realm->cached_context->seq == realm->seq &&
            (!parent ||
             realm->cached_context->seq >= parent->cached_context->seq)) {
-               dout("build_snap_context %llx %p: %p seq %lld (%u snaps)"
-                    " (unchanged)\n",
-                    realm->ino, realm, realm->cached_context,
+               dout("%s %llx %p: %p seq %lld (%u snaps) (unchanged)\n",
+                    __func__, realm->ino, realm, realm->cached_context,
                     realm->cached_context->seq,
                     (unsigned int)realm->cached_context->num_snaps);
                return 0;
@@ -390,9 +399,8 @@ static int build_snap_context(struct ceph_snap_realm *realm,
 
        sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
        snapc->num_snaps = num;
-       dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n",
-            realm->ino, realm, snapc, snapc->seq,
-            (unsigned int) snapc->num_snaps);
+       dout("%s %llx %p: %p seq %lld (%u snaps)\n", __func__, realm->ino,
+            realm, snapc, snapc->seq, (unsigned int) snapc->num_snaps);
 
        ceph_put_snap_context(realm->cached_context);
        realm->cached_context = snapc;
@@ -409,8 +417,7 @@ fail:
                ceph_put_snap_context(realm->cached_context);
                realm->cached_context = NULL;
        }
-       pr_err("build_snap_context %llx %p fail %d\n", realm->ino,
-              realm, err);
+       pr_err("%s %llx %p fail %d\n", __func__, realm->ino, realm, err);
        return err;
 }
 
@@ -420,13 +427,50 @@ fail:
 static void rebuild_snap_realms(struct ceph_snap_realm *realm,
                                struct list_head *dirty_realms)
 {
-       struct ceph_snap_realm *child;
+       LIST_HEAD(realm_queue);
+       int last = 0;
+       bool skip = false;
+
+       list_add_tail(&realm->rebuild_item, &realm_queue);
+
+       while (!list_empty(&realm_queue)) {
+               struct ceph_snap_realm *_realm, *child;
+
+               _realm = list_first_entry(&realm_queue,
+                                         struct ceph_snap_realm,
+                                         rebuild_item);
+
+               /*
+                * If the last building failed dues to memory
+                * issue, just empty the realm_queue and return
+                * to avoid infinite loop.
+                */
+               if (last < 0) {
+                       list_del_init(&_realm->rebuild_item);
+                       continue;
+               }
+
+               last = build_snap_context(_realm, &realm_queue, dirty_realms);
+               dout("%s %llx %p, %s\n", __func__, _realm->ino, _realm,
+                    last > 0 ? "is deferred" : !last ? "succeeded" : "failed");
+
+               /* is any child in the list ? */
+               list_for_each_entry(child, &_realm->children, child_item) {
+                       if (!list_empty(&child->rebuild_item)) {
+                               skip = true;
+                               break;
+                       }
+               }
 
-       dout("rebuild_snap_realms %llx %p\n", realm->ino, realm);
-       build_snap_context(realm, dirty_realms);
+               if (!skip) {
+                       list_for_each_entry(child, &_realm->children, child_item)
+                               list_add_tail(&child->rebuild_item, &realm_queue);
+               }
 
-       list_for_each_entry(child, &realm->children, child_item)
-               rebuild_snap_realms(child, dirty_realms);
+               /* last == 1 means need to build parent first */
+               if (last <= 0)
+                       list_del_init(&_realm->rebuild_item);
+       }
 }
 
 
@@ -474,23 +518,15 @@ static bool has_new_snaps(struct ceph_snap_context *o,
  * Caller must hold snap_rwsem for read (i.e., the realm topology won't
  * change).
  */
-static void ceph_queue_cap_snap(struct ceph_inode_info *ci)
+static void ceph_queue_cap_snap(struct ceph_inode_info *ci,
+                               struct ceph_cap_snap **pcapsnap)
 {
        struct inode *inode = &ci->vfs_inode;
-       struct ceph_cap_snap *capsnap;
        struct ceph_snap_context *old_snapc, *new_snapc;
+       struct ceph_cap_snap *capsnap = *pcapsnap;
        struct ceph_buffer *old_blob = NULL;
        int used, dirty;
 
-       capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
-       if (!capsnap) {
-               pr_err("ENOMEM allocating ceph_cap_snap on %p\n", inode);
-               return;
-       }
-       capsnap->cap_flush.is_capsnap = true;
-       INIT_LIST_HEAD(&capsnap->cap_flush.i_list);
-       INIT_LIST_HEAD(&capsnap->cap_flush.g_list);
-
        spin_lock(&ci->i_ceph_lock);
        used = __ceph_caps_used(ci);
        dirty = __ceph_caps_dirty(ci);
@@ -511,12 +547,14 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci)
                   as no new writes are allowed to start when pending, so any
                   writes in progress now were started before the previous
                   cap_snap.  lucky us. */
-               dout("queue_cap_snap %p already pending\n", inode);
+               dout("%s %p %llx.%llx already pending\n",
+                    __func__, inode, ceph_vinop(inode));
                goto update_snapc;
        }
        if (ci->i_wrbuffer_ref_head == 0 &&
            !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) {
-               dout("queue_cap_snap %p nothing dirty|writing\n", inode);
+               dout("%s %p %llx.%llx nothing dirty|writing\n",
+                    __func__, inode, ceph_vinop(inode));
                goto update_snapc;
        }
 
@@ -536,20 +574,17 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci)
        } else {
                if (!(used & CEPH_CAP_FILE_WR) &&
                    ci->i_wrbuffer_ref_head == 0) {
-                       dout("queue_cap_snap %p "
-                            "no new_snap|dirty_page|writing\n", inode);
+                       dout("%s %p %llx.%llx no new_snap|dirty_page|writing\n",
+                            __func__, inode, ceph_vinop(inode));
                        goto update_snapc;
                }
        }
 
-       dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n",
-            inode, capsnap, old_snapc, ceph_cap_string(dirty),
-            capsnap->need_flush ? "" : "no_flush");
+       dout("%s %p %llx.%llx cap_snap %p queuing under %p %s %s\n",
+            __func__, inode, ceph_vinop(inode), capsnap, old_snapc,
+            ceph_cap_string(dirty), capsnap->need_flush ? "" : "no_flush");
        ihold(inode);
 
-       refcount_set(&capsnap->nref, 1);
-       INIT_LIST_HEAD(&capsnap->ci_item);
-
        capsnap->follows = old_snapc->seq;
        capsnap->issued = __ceph_caps_issued(ci, NULL);
        capsnap->dirty = dirty;
@@ -579,31 +614,30 @@ static void ceph_queue_cap_snap(struct ceph_inode_info *ci)
        list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
 
        if (used & CEPH_CAP_FILE_WR) {
-               dout("queue_cap_snap %p cap_snap %p snapc %p"
-                    " seq %llu used WR, now pending\n", inode,
+               dout("%s %p %llx.%llx cap_snap %p snapc %p seq %llu used WR,"
+                    " now pending\n", __func__, inode, ceph_vinop(inode),
                     capsnap, old_snapc, old_snapc->seq);
                capsnap->writing = 1;
        } else {
                /* note mtime, size NOW. */
                __ceph_finish_cap_snap(ci, capsnap);
        }
-       capsnap = NULL;
+       *pcapsnap = NULL;
        old_snapc = NULL;
 
 update_snapc:
-       if (ci->i_wrbuffer_ref_head == 0 &&
-           ci->i_wr_ref == 0 &&
-           ci->i_dirty_caps == 0 &&
-           ci->i_flushing_caps == 0) {
-               ci->i_head_snapc = NULL;
-       } else {
+       if (ci->i_wrbuffer_ref_head == 0 &&
+           ci->i_wr_ref == 0 &&
+           ci->i_dirty_caps == 0 &&
+           ci->i_flushing_caps == 0) {
+               ci->i_head_snapc = NULL;
+       } else {
                ci->i_head_snapc = ceph_get_snap_context(new_snapc);
                dout(" new snapc is %p\n", new_snapc);
        }
        spin_unlock(&ci->i_ceph_lock);
 
        ceph_buffer_put(old_blob);
-       kfree(capsnap);
        ceph_put_snap_context(old_snapc);
 }
 
@@ -632,27 +666,28 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
        capsnap->truncate_size = ci->i_truncate_size;
        capsnap->truncate_seq = ci->i_truncate_seq;
        if (capsnap->dirty_pages) {
-               dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
-                    "still has %d dirty pages\n", inode, capsnap,
-                    capsnap->context, capsnap->context->seq,
-                    ceph_cap_string(capsnap->dirty), capsnap->size,
-                    capsnap->dirty_pages);
+               dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu "
+                    "still has %d dirty pages\n", __func__, inode,
+                    ceph_vinop(inode), capsnap, capsnap->context,
+                    capsnap->context->seq, ceph_cap_string(capsnap->dirty),
+                    capsnap->size, capsnap->dirty_pages);
                return 0;
        }
 
        /* Fb cap still in use, delay it */
        if (ci->i_wb_ref) {
-               dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu "
-                    "used WRBUFFER, delaying\n", inode, capsnap,
-                    capsnap->context, capsnap->context->seq,
-                    ceph_cap_string(capsnap->dirty), capsnap->size);
+               dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu "
+                    "used WRBUFFER, delaying\n", __func__, inode,
+                    ceph_vinop(inode), capsnap, capsnap->context,
+                    capsnap->context->seq, ceph_cap_string(capsnap->dirty),
+                    capsnap->size);
                capsnap->writing = 1;
                return 0;
        }
 
        ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
-       dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
-            inode, capsnap, capsnap->context,
+       dout("%s %p %llx.%llx cap_snap %p snapc %p %llu %s s=%llu\n",
+            __func__, inode, ceph_vinop(inode), capsnap, capsnap->context,
             capsnap->context->seq, ceph_cap_string(capsnap->dirty),
             capsnap->size);
 
@@ -671,8 +706,9 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
 {
        struct ceph_inode_info *ci;
        struct inode *lastinode = NULL;
+       struct ceph_cap_snap *capsnap = NULL;
 
-       dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino);
+       dout("%s %p %llx inode\n", __func__, realm, realm->ino);
 
        spin_lock(&realm->inodes_with_caps_lock);
        list_for_each_entry(ci, &realm->inodes_with_caps, i_snap_realm_item) {
@@ -682,13 +718,35 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
                spin_unlock(&realm->inodes_with_caps_lock);
                iput(lastinode);
                lastinode = inode;
-               ceph_queue_cap_snap(ci);
+
+               /*
+                * Allocate the capsnap memory outside of ceph_queue_cap_snap()
+                * to reduce very possible but unnecessary frequently memory
+                * allocate/free in this loop.
+                */
+               if (!capsnap) {
+                       capsnap = kmem_cache_zalloc(ceph_cap_snap_cachep, GFP_NOFS);
+                       if (!capsnap) {
+                               pr_err("ENOMEM allocating ceph_cap_snap on %p\n",
+                                      inode);
+                               return;
+                       }
+               }
+               capsnap->cap_flush.is_capsnap = true;
+               refcount_set(&capsnap->nref, 1);
+               INIT_LIST_HEAD(&capsnap->cap_flush.i_list);
+               INIT_LIST_HEAD(&capsnap->cap_flush.g_list);
+               INIT_LIST_HEAD(&capsnap->ci_item);
+
+               ceph_queue_cap_snap(ci, &capsnap);
                spin_lock(&realm->inodes_with_caps_lock);
        }
        spin_unlock(&realm->inodes_with_caps_lock);
        iput(lastinode);
 
-       dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
+       if (capsnap)
+               kmem_cache_free(ceph_cap_snap_cachep, capsnap);
+       dout("%s %p %llx done\n", __func__, realm, realm->ino);
 }
 
 /*
@@ -707,14 +765,16 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
        __le64 *prior_parent_snaps;        /* encoded */
        struct ceph_snap_realm *realm = NULL;
        struct ceph_snap_realm *first_realm = NULL;
-       int invalidate = 0;
+       struct ceph_snap_realm *realm_to_rebuild = NULL;
+       int rebuild_snapcs;
        int err = -ENOMEM;
        LIST_HEAD(dirty_realms);
 
        lockdep_assert_held_write(&mdsc->snap_rwsem);
 
-       dout("update_snap_trace deletion=%d\n", deletion);
+       dout("%s deletion=%d\n", __func__, deletion);
 more:
+       rebuild_snapcs = 0;
        ceph_decode_need(&p, e, sizeof(*ri), bad);
        ri = p;
        p += sizeof(*ri);
@@ -738,10 +798,10 @@ more:
        err = adjust_snap_realm_parent(mdsc, realm, le64_to_cpu(ri->parent));
        if (err < 0)
                goto fail;
-       invalidate += err;
+       rebuild_snapcs += err;
 
        if (le64_to_cpu(ri->seq) > realm->seq) {
-               dout("update_snap_trace updating %llx %p %lld -> %lld\n",
+               dout("%s updating %llx %p %lld -> %lld\n", __func__,
                     realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
                /* update realm parameters, snap lists */
                realm->seq = le64_to_cpu(ri->seq);
@@ -763,22 +823,30 @@ more:
                if (realm->seq > mdsc->last_snap_seq)
                        mdsc->last_snap_seq = realm->seq;
 
-               invalidate = 1;
+               rebuild_snapcs = 1;
        } else if (!realm->cached_context) {
-               dout("update_snap_trace %llx %p seq %lld new\n",
+               dout("%s %llx %p seq %lld new\n", __func__,
                     realm->ino, realm, realm->seq);
-               invalidate = 1;
+               rebuild_snapcs = 1;
        } else {
-               dout("update_snap_trace %llx %p seq %lld unchanged\n",
+               dout("%s %llx %p seq %lld unchanged\n", __func__,
                     realm->ino, realm, realm->seq);
        }
 
-       dout("done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
-            realm, invalidate, p, e);
+       dout("done with %llx %p, rebuild_snapcs=%d, %p %p\n", realm->ino,
+            realm, rebuild_snapcs, p, e);
+
+       /*
+        * this will always track the uppest parent realm from which
+        * we need to rebuild the snapshot contexts _downward_ in
+        * hierarchy.
+        */
+       if (rebuild_snapcs)
+               realm_to_rebuild = realm;
 
-       /* invalidate when we reach the _end_ (root) of the trace */
-       if (invalidate && p >= e)
-               rebuild_snap_realms(realm, &dirty_realms);
+       /* rebuild_snapcs when we reach the _end_ (root) of the trace */
+       if (realm_to_rebuild && p >= e)
+               rebuild_snap_realms(realm_to_rebuild, &dirty_realms);
 
        if (!first_realm)
                first_realm = realm;
@@ -814,7 +882,7 @@ fail:
                ceph_put_snap_realm(mdsc, realm);
        if (first_realm)
                ceph_put_snap_realm(mdsc, first_realm);
-       pr_err("update_snap_trace error %d\n", err);
+       pr_err("%s error %d\n", __func__, err);
        return err;
 }
 
@@ -831,7 +899,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
        struct inode *inode;
        struct ceph_mds_session *session = NULL;
 
-       dout("flush_snaps\n");
+       dout("%s\n", __func__);
        spin_lock(&mdsc->snap_flush_lock);
        while (!list_empty(&mdsc->snap_flush_list)) {
                ci = list_first_entry(&mdsc->snap_flush_list,
@@ -846,7 +914,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
        spin_unlock(&mdsc->snap_flush_lock);
 
        ceph_put_mds_session(session);
-       dout("flush_snaps done\n");
+       dout("%s done\n", __func__);
 }
 
 /**
@@ -928,8 +996,8 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
        trace_len = le32_to_cpu(h->trace_len);
        p += sizeof(*h);
 
-       dout("handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
-            ceph_snap_op_name(op), split, trace_len);
+       dout("%s from mds%d op %s split %llx tracelen %d\n", __func__,
+            mds, ceph_snap_op_name(op), split, trace_len);
 
        mutex_lock(&session->s_mutex);
        inc_session_sequence(session);
@@ -989,13 +1057,13 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
                         */
                        if (ci->i_snap_realm->created >
                            le64_to_cpu(ri->created)) {
-                               dout(" leaving %p in newer realm %llx %p\n",
-                                    inode, ci->i_snap_realm->ino,
+                               dout(" leaving %p %llx.%llx in newer realm %llx %p\n",
+                                    inode, ceph_vinop(inode), ci->i_snap_realm->ino,
                                     ci->i_snap_realm);
                                goto skip_inode;
                        }
-                       dout(" will move %p to split realm %llx %p\n",
-                            inode, realm->ino, realm);
+                       dout(" will move %p %llx.%llx to split realm %llx %p\n",
+                            inode, ceph_vinop(inode), realm->ino, realm);
 
                        ceph_get_snap_realm(mdsc, realm);
                        ceph_change_snap_realm(inode, realm);
@@ -1038,7 +1106,7 @@ skip_inode:
        return;
 
 bad:
-       pr_err("corrupt snap message from mds%d\n", mds);
+       pr_err("%s corrupt snap message from mds%d\n", __func__, mds);
        ceph_msg_dump(msg);
 out:
        if (locked_rwsem)
@@ -1071,7 +1139,8 @@ struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
        }
        spin_unlock(&mdsc->snapid_map_lock);
        if (exist) {
-               dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
+               dout("%s found snapid map %llx -> %x\n", __func__,
+                    exist->snap, exist->dev);
                return exist;
        }
 
@@ -1115,11 +1184,13 @@ struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
        if (exist) {
                free_anon_bdev(sm->dev);
                kfree(sm);
-               dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
+               dout("%s found snapid map %llx -> %x\n", __func__,
+                    exist->snap, exist->dev);
                return exist;
        }
 
-       dout("create snapid map %llx -> %x\n", sm->snap, sm->dev);
+       dout("%s create snapid map %llx -> %x\n", __func__,
+            sm->snap, sm->dev);
        return sm;
 }
 
index 573bb95..e36e894 100644 (file)
@@ -60,6 +60,7 @@ const char *ceph_mds_op_name(int op)
        case CEPH_MDS_OP_LOOKUPINO:  return "lookupino";
        case CEPH_MDS_OP_LOOKUPNAME:  return "lookupname";
        case CEPH_MDS_OP_GETATTR:  return "getattr";
+       case CEPH_MDS_OP_GETVXATTR:  return "getvxattr";
        case CEPH_MDS_OP_SETXATTR: return "setxattr";
        case CEPH_MDS_OP_SETATTR: return "setattr";
        case CEPH_MDS_OP_RMXATTR: return "rmxattr";
index 4a3b77d..e6987d2 100644 (file)
@@ -865,6 +865,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
  */
 struct kmem_cache *ceph_inode_cachep;
 struct kmem_cache *ceph_cap_cachep;
+struct kmem_cache *ceph_cap_snap_cachep;
 struct kmem_cache *ceph_cap_flush_cachep;
 struct kmem_cache *ceph_dentry_cachep;
 struct kmem_cache *ceph_file_cachep;
@@ -893,6 +894,9 @@ static int __init init_caches(void)
        ceph_cap_cachep = KMEM_CACHE(ceph_cap, SLAB_MEM_SPREAD);
        if (!ceph_cap_cachep)
                goto bad_cap;
+       ceph_cap_snap_cachep = KMEM_CACHE(ceph_cap_snap, SLAB_MEM_SPREAD);
+       if (!ceph_cap_snap_cachep)
+               goto bad_cap_snap;
        ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush,
                                           SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
        if (!ceph_cap_flush_cachep)
@@ -932,6 +936,8 @@ bad_file:
 bad_dentry:
        kmem_cache_destroy(ceph_cap_flush_cachep);
 bad_cap_flush:
+       kmem_cache_destroy(ceph_cap_snap_cachep);
+bad_cap_snap:
        kmem_cache_destroy(ceph_cap_cachep);
 bad_cap:
        kmem_cache_destroy(ceph_inode_cachep);
@@ -948,6 +954,7 @@ static void destroy_caches(void)
 
        kmem_cache_destroy(ceph_inode_cachep);
        kmem_cache_destroy(ceph_cap_cachep);
+       kmem_cache_destroy(ceph_cap_snap_cachep);
        kmem_cache_destroy(ceph_cap_flush_cachep);
        kmem_cache_destroy(ceph_dentry_cachep);
        kmem_cache_destroy(ceph_file_cachep);
index 0bd97ae..a1ecc41 100644 (file)
@@ -231,7 +231,7 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
        if (refcount_dec_and_test(&capsnap->nref)) {
                if (capsnap->xattr_blob)
                        ceph_buffer_put(capsnap->xattr_blob);
-               kfree(capsnap);
+               kmem_cache_free(ceph_cap_snap_cachep, capsnap);
        }
 }
 
@@ -884,6 +884,8 @@ struct ceph_snap_realm {
 
        struct list_head dirty_item;     /* if realm needs new context */
 
+       struct list_head rebuild_item;   /* rebuild snap realms _downward_ in hierarchy */
+
        /* the current set of snaps for this realm */
        struct ceph_snap_context *cached_context;
 
@@ -939,7 +941,7 @@ extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
                             struct ceph_msg *msg);
 extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
                                  struct ceph_cap_snap *capsnap);
-extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
+extern void ceph_cleanup_global_and_empty_realms(struct ceph_mds_client *mdsc);
 
 extern struct ceph_snapid_map *ceph_get_snapid_map(struct ceph_mds_client *mdsc,
                                                   u64 snap);
@@ -1049,6 +1051,7 @@ static inline bool ceph_inode_is_shutdown(struct inode *inode)
 
 /* xattr.c */
 int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int);
+int ceph_do_getvxattr(struct inode *inode, const char *name, void *value, size_t size);
 ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t);
 extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
 extern struct ceph_buffer *__ceph_build_xattrs_blob(struct ceph_inode_info *ci);
@@ -1214,7 +1217,7 @@ extern void __ceph_touch_fmode(struct ceph_inode_info *ci,
 /* addr.c */
 extern const struct address_space_operations ceph_aops;
 extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
-extern int ceph_uninline_data(struct file *filp, struct page *locked_page);
+extern int ceph_uninline_data(struct file *file);
 extern int ceph_pool_perm_check(struct inode *inode, int need);
 extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
 int ceph_purge_inode_cap(struct inode *inode, struct ceph_cap *cap, bool *invalidate);
index fcf7dfd..afec840 100644 (file)
@@ -923,10 +923,13 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_inode_xattr *xattr;
-       struct ceph_vxattr *vxattr = NULL;
+       struct ceph_vxattr *vxattr;
        int req_mask;
        ssize_t err;
 
+       if (strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN))
+               goto handle_non_vxattrs;
+
        /* let's see if a virtual xattr was requested */
        vxattr = ceph_match_vxattr(inode, name);
        if (vxattr) {
@@ -945,8 +948,14 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
                                err = -ERANGE;
                }
                return err;
+       } else {
+               err = ceph_do_getvxattr(inode, name, value, size);
+               /* this would happen with a new client and old server combo */
+               if (err == -EOPNOTSUPP)
+                       err = -ENODATA;
+               return err;
        }
-
+handle_non_vxattrs:
        req_mask = __get_request_mask(inode);
 
        spin_lock(&ci->i_ceph_lock);
index 7ad6c3d..86bf82d 100644 (file)
@@ -28,8 +28,8 @@
 
 
 #define CEPH_INO_ROOT   1
-#define CEPH_INO_CEPH   2       /* hidden .ceph dir */
-#define CEPH_INO_DOTDOT 3      /* used by ceph fuse for parent (..) */
+#define CEPH_INO_CEPH   2            /* hidden .ceph dir */
+#define CEPH_INO_GLOBAL_SNAPREALM  3 /* global dummy snaprealm */
 
 /* arbitrary limit on max # of monitors (cluster of 3 is typical) */
 #define CEPH_MAX_MON   31
@@ -328,6 +328,7 @@ enum {
        CEPH_MDS_OP_LOOKUPPARENT = 0x00103,
        CEPH_MDS_OP_LOOKUPINO  = 0x00104,
        CEPH_MDS_OP_LOOKUPNAME = 0x00105,
+       CEPH_MDS_OP_GETVXATTR  = 0x00106,
 
        CEPH_MDS_OP_SETXATTR   = 0x01105,
        CEPH_MDS_OP_RMXATTR    = 0x01106,
index edf62ea..00af2c9 100644 (file)
@@ -284,6 +284,7 @@ DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld)
 
 extern struct kmem_cache *ceph_inode_cachep;
 extern struct kmem_cache *ceph_cap_cachep;
+extern struct kmem_cache *ceph_cap_snap_cachep;
 extern struct kmem_cache *ceph_cap_flush_cachep;
 extern struct kmem_cache *ceph_dentry_cachep;
 extern struct kmem_cache *ceph_file_cachep;
index c81379f..c6e5bfc 100644 (file)
@@ -1773,10 +1773,8 @@ static int prepare_read_data(struct ceph_connection *con)
 
                bv.bv_page = con->bounce_page;
                bv.bv_offset = 0;
-               set_in_bvec(con, &bv);
-       } else {
-               set_in_bvec(con, &bv);
        }
+       set_in_bvec(con, &bv);
        con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT;
        return 0;
 }
@@ -1807,10 +1805,8 @@ static void prepare_read_data_cont(struct ceph_connection *con)
                if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
                        bv.bv_page = con->bounce_page;
                        bv.bv_offset = 0;
-                       set_in_bvec(con, &bv);
-               } else {
-                       set_in_bvec(con, &bv);
                }
+               set_in_bvec(con, &bv);
                WARN_ON(con->v2.in_state != IN_S_PREPARE_READ_DATA_CONT);
                return;
        }