Merge tag 's390-5.13-2' of git://git.kernel.org/pub/scm/linux/kernel/git/s390/linux
[linux-2.6-microblaze.git] / fs / ceph / addr.c
index 26e6643..c1570fa 100644 (file)
@@ -12,6 +12,7 @@
 #include <linux/signal.h>
 #include <linux/iversion.h>
 #include <linux/ktime.h>
+#include <linux/netfs.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -61,6 +62,9 @@
        (CONGESTION_ON_THRESH(congestion_kb) -                          \
         (CONGESTION_ON_THRESH(congestion_kb) >> 2))
 
+static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
+                                       struct page *page, void **_fsdata);
+
 static inline struct ceph_snap_context *page_snap_context(struct page *page)
 {
        if (PagePrivate(page))
@@ -124,8 +128,7 @@ static int ceph_set_page_dirty(struct page *page)
         * PagePrivate so that we get invalidatepage callback.
         */
        BUG_ON(PagePrivate(page));
-       page->private = (unsigned long)snapc;
-       SetPagePrivate(page);
+       attach_page_private(page, snapc);
 
        ret = __set_page_dirty_nobuffers(page);
        WARN_ON(!PageLocked(page));
@@ -144,19 +147,19 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
 {
        struct inode *inode;
        struct ceph_inode_info *ci;
-       struct ceph_snap_context *snapc = page_snap_context(page);
+       struct ceph_snap_context *snapc;
+
+       wait_on_page_fscache(page);
 
        inode = page->mapping->host;
        ci = ceph_inode(inode);
 
-       if (offset != 0 || length != PAGE_SIZE) {
+       if (offset != 0 || length != thp_size(page)) {
                dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
                     inode, page, page->index, offset, length);
                return;
        }
 
-       ceph_invalidate_fscache_page(inode, page);
-
        WARN_ON(!PageLocked(page));
        if (!PagePrivate(page))
                return;
@@ -164,333 +167,222 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
        dout("%p invalidatepage %p idx %lu full dirty page\n",
             inode, page, page->index);
 
+       snapc = detach_page_private(page);
        ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
        ceph_put_snap_context(snapc);
-       page->private = 0;
-       ClearPagePrivate(page);
 }
 
-static int ceph_releasepage(struct page *page, gfp_t g)
+static int ceph_releasepage(struct page *page, gfp_t gfp)
 {
        dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
             page, page->index, PageDirty(page) ? "" : "not ");
 
-       /* Can we release the page from the cache? */
-       if (!ceph_release_fscache_page(page, g))
-               return 0;
-
+       if (PageFsCache(page)) {
+               if (!(gfp & __GFP_DIRECT_RECLAIM) || !(gfp & __GFP_FS))
+                       return 0;
+               wait_on_page_fscache(page);
+       }
        return !PagePrivate(page);
 }
 
-/* read a single page, without unlocking it. */
-static int ceph_do_readpage(struct file *filp, struct page *page)
+static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq)
 {
-       struct inode *inode = file_inode(filp);
+       struct inode *inode = rreq->mapping->host;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_osd_client *osdc = &fsc->client->osdc;
-       struct ceph_osd_request *req;
-       struct ceph_vino vino = ceph_vino(inode);
-       int err = 0;
-       u64 off = page_offset(page);
-       u64 len = PAGE_SIZE;
-
-       if (off >= i_size_read(inode)) {
-               zero_user_segment(page, 0, PAGE_SIZE);
-               SetPageUptodate(page);
-               return 0;
-       }
-
-       if (ci->i_inline_version != CEPH_INLINE_NONE) {
-               /*
-                * Uptodate inline data should have been added
-                * into page cache while getting Fcr caps.
-                */
-               if (off == 0)
-                       return -EINVAL;
-               zero_user_segment(page, 0, PAGE_SIZE);
-               SetPageUptodate(page);
-               return 0;
-       }
-
-       err = ceph_readpage_from_fscache(inode, page);
-       if (err == 0)
-               return -EINPROGRESS;
-
-       dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
-            vino.ino, vino.snap, filp, off, len, page, page->index);
-       req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 0, 1,
-                                   CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL,
-                                   ci->i_truncate_seq, ci->i_truncate_size,
-                                   false);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
+       struct ceph_file_layout *lo = &ci->i_layout;
+       u32 blockoff;
+       u64 blockno;
 
-       osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
+       /* Expand the start downward */
+       blockno = div_u64_rem(rreq->start, lo->stripe_unit, &blockoff);
+       rreq->start = blockno * lo->stripe_unit;
+       rreq->len += blockoff;
 
-       err = ceph_osdc_start_request(osdc, req, false);
-       if (!err)
-               err = ceph_osdc_wait_request(osdc, req);
-
-       ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
-                                req->r_end_latency, err);
-
-       ceph_osdc_put_request(req);
-       dout("readpage result %d\n", err);
-
-       if (err == -ENOENT)
-               err = 0;
-       if (err < 0) {
-               ceph_fscache_readpage_cancel(inode, page);
-               if (err == -EBLOCKLISTED)
-                       fsc->blocklisted = true;
-               goto out;
-       }
-       if (err < PAGE_SIZE)
-               /* zero fill remainder of page */
-               zero_user_segment(page, err, PAGE_SIZE);
-       else
-               flush_dcache_page(page);
-
-       SetPageUptodate(page);
-       ceph_readpage_to_fscache(inode, page);
-
-out:
-       return err < 0 ? err : 0;
+       /* Now, round up the length to the next block */
+       rreq->len = roundup(rreq->len, lo->stripe_unit);
 }
 
-static int ceph_readpage(struct file *filp, struct page *page)
+static bool ceph_netfs_clamp_length(struct netfs_read_subrequest *subreq)
 {
-       int r = ceph_do_readpage(filp, page);
-       if (r != -EINPROGRESS)
-               unlock_page(page);
-       else
-               r = 0;
-       return r;
+       struct inode *inode = subreq->rreq->mapping->host;
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       u64 objno, objoff;
+       u32 xlen;
+
+       /* Truncate the extent at the end of the current block */
+       ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len,
+                                     &objno, &objoff, &xlen);
+       subreq->len = min(xlen, fsc->mount_options->rsize);
+       return true;
 }
 
-/*
- * Finish an async read(ahead) op.
- */
-static void finish_read(struct ceph_osd_request *req)
+static void finish_netfs_read(struct ceph_osd_request *req)
 {
-       struct inode *inode = req->r_inode;
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_osd_data *osd_data;
-       int rc = req->r_result <= 0 ? req->r_result : 0;
-       int bytes = req->r_result >= 0 ? req->r_result : 0;
+       struct ceph_fs_client *fsc = ceph_inode_to_client(req->r_inode);
+       struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+       struct netfs_read_subrequest *subreq = req->r_priv;
        int num_pages;
-       int i;
+       int err = req->r_result;
 
-       dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
-       if (rc == -EBLOCKLISTED)
-               ceph_inode_to_client(inode)->blocklisted = true;
+       ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency,
+                                req->r_end_latency, err);
 
-       /* unlock all pages, zeroing any data we didn't read */
-       osd_data = osd_req_op_extent_osd_data(req, 0);
-       BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
-       num_pages = calc_pages_for((u64)osd_data->alignment,
-                                       (u64)osd_data->length);
-       for (i = 0; i < num_pages; i++) {
-               struct page *page = osd_data->pages[i];
-
-               if (rc < 0 && rc != -ENOENT) {
-                       ceph_fscache_readpage_cancel(inode, page);
-                       goto unlock;
-               }
-               if (bytes < (int)PAGE_SIZE) {
-                       /* zero (remainder of) page */
-                       int s = bytes < 0 ? 0 : bytes;
-                       zero_user_segment(page, s, PAGE_SIZE);
-               }
-               dout("finish_read %p uptodate %p idx %lu\n", inode, page,
-                    page->index);
-               flush_dcache_page(page);
-               SetPageUptodate(page);
-               ceph_readpage_to_fscache(inode, page);
-unlock:
-               unlock_page(page);
-               put_page(page);
-               bytes -= PAGE_SIZE;
-       }
+       dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result,
+            subreq->len, i_size_read(req->r_inode));
 
-       ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
-                                req->r_end_latency, rc);
+       /* no object means success but no data */
+       if (err == -ENOENT)
+               err = 0;
+       else if (err == -EBLOCKLISTED)
+               fsc->blocklisted = true;
+
+       if (err >= 0 && err < subreq->len)
+               __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
+
+       netfs_subreq_terminated(subreq, err, true);
 
-       kfree(osd_data->pages);
+       num_pages = calc_pages_for(osd_data->alignment, osd_data->length);
+       ceph_put_page_vector(osd_data->pages, num_pages, false);
+       iput(req->r_inode);
 }
 
-/*
- * start an async read(ahead) operation.  return nr_pages we submitted
- * a read for on success, or negative error code.
- */
-static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
-                     struct list_head *page_list, int max)
+static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
 {
-       struct ceph_osd_client *osdc =
-               &ceph_inode_to_client(inode)->client->osdc;
+       struct netfs_read_request *rreq = subreq->rreq;
+       struct inode *inode = rreq->mapping->host;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct page *page = lru_to_page(page_list);
-       struct ceph_vino vino;
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_osd_request *req;
-       u64 off;
-       u64 len;
-       int i;
+       struct ceph_vino vino = ceph_vino(inode);
+       struct iov_iter iter;
        struct page **pages;
-       pgoff_t next_index;
-       int nr_pages = 0;
-       int got = 0;
-       int ret = 0;
-
-       if (!rw_ctx) {
-               /* caller of readpages does not hold buffer and read caps
-                * (fadvise, madvise and readahead cases) */
-               int want = CEPH_CAP_FILE_CACHE;
-               ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want,
-                                       true, &got);
-               if (ret < 0) {
-                       dout("start_read %p, error getting cap\n", inode);
-               } else if (!(got & want)) {
-                       dout("start_read %p, no cache cap\n", inode);
-                       ret = 0;
-               }
-               if (ret <= 0) {
-                       if (got)
-                               ceph_put_cap_refs(ci, got);
-                       while (!list_empty(page_list)) {
-                               page = lru_to_page(page_list);
-                               list_del(&page->lru);
-                               put_page(page);
-                       }
-                       return ret;
-               }
-       }
-
-       off = (u64) page_offset(page);
+       size_t page_off;
+       int err = 0;
+       u64 len = subreq->len;
 
-       /* count pages */
-       next_index = page->index;
-       list_for_each_entry_reverse(page, page_list, lru) {
-               if (page->index != next_index)
-                       break;
-               nr_pages++;
-               next_index++;
-               if (max && nr_pages == max)
-                       break;
-       }
-       len = nr_pages << PAGE_SHIFT;
-       dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
-            off, len);
-       vino = ceph_vino(inode);
-       req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
-                                   0, 1, CEPH_OSD_OP_READ,
-                                   CEPH_OSD_FLAG_READ, NULL,
-                                   ci->i_truncate_seq, ci->i_truncate_size,
-                                   false);
+       req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
+                       0, 1, CEPH_OSD_OP_READ,
+                       CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
+                       NULL, ci->i_truncate_seq, ci->i_truncate_size, false);
        if (IS_ERR(req)) {
-               ret = PTR_ERR(req);
+               err = PTR_ERR(req);
+               req = NULL;
                goto out;
        }
 
-       /* build page vector */
-       nr_pages = calc_pages_for(0, len);
-       pages = kmalloc_array(nr_pages, sizeof(*pages), GFP_KERNEL);
-       if (!pages) {
-               ret = -ENOMEM;
-               goto out_put;
-       }
-       for (i = 0; i < nr_pages; ++i) {
-               page = list_entry(page_list->prev, struct page, lru);
-               BUG_ON(PageLocked(page));
-               list_del(&page->lru);
-
-               dout("start_read %p adding %p idx %lu\n", inode, page,
-                    page->index);
-               if (add_to_page_cache_lru(page, &inode->i_data, page->index,
-                                         GFP_KERNEL)) {
-                       ceph_fscache_uncache_page(inode, page);
-                       put_page(page);
-                       dout("start_read %p add_to_page_cache failed %p\n",
-                            inode, page);
-                       nr_pages = i;
-                       if (nr_pages > 0) {
-                               len = nr_pages << PAGE_SHIFT;
-                               osd_req_op_extent_update(req, 0, len);
-                               break;
-                       }
-                       goto out_pages;
-               }
-               pages[i] = page;
+       dout("%s: pos=%llu orig_len=%zu len=%llu\n", __func__, subreq->start, subreq->len, len);
+       iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
+       err = iov_iter_get_pages_alloc(&iter, &pages, len, &page_off);
+       if (err < 0) {
+               dout("%s: iov_ter_get_pages_alloc returned %d\n", __func__, err);
+               goto out;
        }
+
+       /* should always give us a page-aligned read */
+       WARN_ON_ONCE(page_off);
+       len = err;
+
        osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
-       req->r_callback = finish_read;
+       req->r_callback = finish_netfs_read;
+       req->r_priv = subreq;
        req->r_inode = inode;
+       ihold(inode);
 
-       dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
-       ret = ceph_osdc_start_request(osdc, req, false);
-       if (ret < 0)
-               goto out_pages;
+       err = ceph_osdc_start_request(req->r_osdc, req, false);
+       if (err)
+               iput(inode);
+out:
        ceph_osdc_put_request(req);
+       if (err)
+               netfs_subreq_terminated(subreq, err, false);
+       dout("%s: result %d\n", __func__, err);
+}
 
-       /* After adding locked pages to page cache, the inode holds cache cap.
-        * So we can drop our cap refs. */
-       if (got)
-               ceph_put_cap_refs(ci, got);
+static void ceph_init_rreq(struct netfs_read_request *rreq, struct file *file)
+{
+}
 
-       return nr_pages;
+static void ceph_readahead_cleanup(struct address_space *mapping, void *priv)
+{
+       struct inode *inode = mapping->host;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int got = (uintptr_t)priv;
 
-out_pages:
-       for (i = 0; i < nr_pages; ++i) {
-               ceph_fscache_readpage_cancel(inode, pages[i]);
-               unlock_page(pages[i]);
-       }
-       ceph_put_page_vector(pages, nr_pages, false);
-out_put:
-       ceph_osdc_put_request(req);
-out:
        if (got)
                ceph_put_cap_refs(ci, got);
-       return ret;
 }
 
+const struct netfs_read_request_ops ceph_netfs_read_ops = {
+       .init_rreq              = ceph_init_rreq,
+       .is_cache_enabled       = ceph_is_cache_enabled,
+       .begin_cache_operation  = ceph_begin_cache_operation,
+       .issue_op               = ceph_netfs_issue_op,
+       .expand_readahead       = ceph_netfs_expand_readahead,
+       .clamp_length           = ceph_netfs_clamp_length,
+       .check_write_begin      = ceph_netfs_check_write_begin,
+       .cleanup                = ceph_readahead_cleanup,
+};
 
-/*
- * Read multiple pages.  Leave pages we don't read + unlock in page_list;
- * the caller (VM) cleans them up.
- */
-static int ceph_readpages(struct file *file, struct address_space *mapping,
-                         struct list_head *page_list, unsigned nr_pages)
+/* read a single page, without unlocking it. */
+static int ceph_readpage(struct file *file, struct page *page)
 {
        struct inode *inode = file_inode(file);
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_file_info *fi = file->private_data;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_vino vino = ceph_vino(inode);
+       u64 off = page_offset(page);
+       u64 len = thp_size(page);
+
+       if (ci->i_inline_version != CEPH_INLINE_NONE) {
+               /*
+                * Uptodate inline data should have been added
+                * into page cache while getting Fcr caps.
+                */
+               if (off == 0) {
+                       unlock_page(page);
+                       return -EINVAL;
+               }
+               zero_user_segment(page, 0, thp_size(page));
+               SetPageUptodate(page);
+               unlock_page(page);
+               return 0;
+       }
+
+       dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
+            vino.ino, vino.snap, file, off, len, page, page->index);
+
+       return netfs_readpage(file, page, &ceph_netfs_read_ops, NULL);
+}
+
+static void ceph_readahead(struct readahead_control *ractl)
+{
+       struct inode *inode = file_inode(ractl->file);
+       struct ceph_file_info *fi = ractl->file->private_data;
        struct ceph_rw_context *rw_ctx;
-       int rc = 0;
-       int max = 0;
+       int got = 0;
+       int ret = 0;
 
        if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
-               return -EINVAL;
+               return;
 
-       rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
-                                        &nr_pages);
+       rw_ctx = ceph_find_rw_context(fi);
+       if (!rw_ctx) {
+               /*
+                * readahead callers do not necessarily hold Fcb caps
+                * (e.g. fadvise, madvise).
+                */
+               int want = CEPH_CAP_FILE_CACHE;
 
-       if (rc == 0)
-               goto out;
+               ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got);
+               if (ret < 0)
+                       dout("start_read %p, error getting cap\n", inode);
+               else if (!(got & want))
+                       dout("start_read %p, no cache cap\n", inode);
 
-       rw_ctx = ceph_find_rw_context(fi);
-       max = fsc->mount_options->rsize >> PAGE_SHIFT;
-       dout("readpages %p file %p ctx %p nr_pages %d max %d\n",
-            inode, file, rw_ctx, nr_pages, max);
-       while (!list_empty(page_list)) {
-               rc = start_read(inode, rw_ctx, page_list, max);
-               if (rc < 0)
-                       goto out;
+               if (ret <= 0)
+                       return;
        }
-out:
-       ceph_fscache_readpages_cancel(inode, page_list);
-
-       dout("readpages %p file %p ret %d\n", inode, file, rc);
-       return rc;
+       netfs_readahead(ractl, &ceph_netfs_read_ops, (void *)(uintptr_t)got);
 }
 
 struct ceph_writeback_ctl
@@ -585,8 +477,8 @@ static u64 get_writepages_data_length(struct inode *inode,
                spin_unlock(&ci->i_ceph_lock);
                WARN_ON(!found);
        }
-       if (end > page_offset(page) + PAGE_SIZE)
-               end = page_offset(page) + PAGE_SIZE;
+       if (end > page_offset(page) + thp_size(page))
+               end = page_offset(page) + thp_size(page);
        return end > start ? end - start : 0;
 }
 
@@ -604,7 +496,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        struct ceph_snap_context *snapc, *oldest;
        loff_t page_off = page_offset(page);
        int err;
-       loff_t len = PAGE_SIZE;
+       loff_t len = thp_size(page);
        struct ceph_writeback_ctl ceph_wbc;
        struct ceph_osd_client *osdc = &fsc->client->osdc;
        struct ceph_osd_request *req;
@@ -632,7 +524,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        /* is this a partial page at end of file? */
        if (page_off >= ceph_wbc.i_size) {
                dout("%p page eof %llu\n", page, ceph_wbc.i_size);
-               page->mapping->a_ops->invalidatepage(page, 0, PAGE_SIZE);
+               page->mapping->a_ops->invalidatepage(page, 0, thp_size(page));
                return 0;
        }
 
@@ -658,7 +550,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        }
 
        /* it may be a short write due to an object boundary */
-       WARN_ON_ONCE(len > PAGE_SIZE);
+       WARN_ON_ONCE(len > thp_size(page));
        osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
        dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);
 
@@ -667,7 +559,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        if (!err)
                err = ceph_osdc_wait_request(osdc, req);
 
-       ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+       ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
                                  req->r_end_latency, err);
 
        ceph_osdc_put_request(req);
@@ -695,8 +587,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
                dout("writepage cleaned page %p\n", page);
                err = 0;  /* vfs expects us to return 0 */
        }
-       page->private = 0;
-       ClearPagePrivate(page);
+       oldest = detach_page_private(page);
+       WARN_ON_ONCE(oldest != snapc);
        end_page_writeback(page);
        ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
        ceph_put_snap_context(snapc);  /* page's reference */
@@ -755,7 +647,7 @@ static void writepages_finish(struct ceph_osd_request *req)
                ceph_clear_error_write(ci);
        }
 
-       ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+       ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
                                  req->r_end_latency, rc);
 
        /*
@@ -788,11 +680,9 @@ static void writepages_finish(struct ceph_osd_request *req)
                                clear_bdi_congested(inode_to_bdi(inode),
                                                    BLK_RW_ASYNC);
 
-                       ceph_put_snap_context(page_snap_context(page));
-                       page->private = 0;
-                       ClearPagePrivate(page);
-                       dout("unlocking %p\n", page);
+                       ceph_put_snap_context(detach_page_private(page));
                        end_page_writeback(page);
+                       dout("unlocking %p\n", page);
 
                        if (remove_page)
                                generic_error_remove_page(inode->i_mapping,
@@ -949,7 +839,7 @@ get_more_pages:
                                    page_offset(page) >= i_size_read(inode)) &&
                                    clear_page_dirty_for_io(page))
                                        mapping->a_ops->invalidatepage(page,
-                                                               0, PAGE_SIZE);
+                                                               0, thp_size(page));
                                unlock_page(page);
                                continue;
                        }
@@ -1038,7 +928,7 @@ get_more_pages:
                        pages[locked_pages++] = page;
                        pvec.pages[i] = NULL;
 
-                       len += PAGE_SIZE;
+                       len += thp_size(page);
                }
 
                /* did we get anything? */
@@ -1087,7 +977,7 @@ new_request:
                        BUG_ON(IS_ERR(req));
                }
                BUG_ON(len < page_offset(pages[locked_pages - 1]) +
-                            PAGE_SIZE - offset);
+                            thp_size(page) - offset);
 
                req->r_callback = writepages_finish;
                req->r_inode = inode;
@@ -1117,7 +1007,7 @@ new_request:
                        }
 
                        set_page_writeback(pages[i]);
-                       len += PAGE_SIZE;
+                       len += thp_size(page);
                }
 
                if (ceph_wbc.size_stable) {
@@ -1126,7 +1016,7 @@ new_request:
                        /* writepages_finish() clears writeback pages
                         * according to the data length, so make sure
                         * data length covers all locked pages */
-                       u64 min_len = len + 1 - PAGE_SIZE;
+                       u64 min_len = len + 1 - thp_size(page);
                        len = get_writepages_data_length(inode, pages[i - 1],
                                                         offset);
                        len = max(len, min_len);
@@ -1302,6 +1192,31 @@ ceph_find_incompatible(struct page *page)
        return NULL;
 }
 
+static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
+                                       struct page *page, void **_fsdata)
+{
+       struct inode *inode = file_inode(file);
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_snap_context *snapc;
+
+       snapc = ceph_find_incompatible(page);
+       if (snapc) {
+               int r;
+
+               unlock_page(page);
+               put_page(page);
+               if (IS_ERR(snapc))
+                       return PTR_ERR(snapc);
+
+               ceph_queue_writeback(inode);
+               r = wait_event_killable(ci->i_cap_wq,
+                                       context_is_writeable_or_written(inode, snapc));
+               ceph_put_snap_context(snapc);
+               return r == 0 ? -EAGAIN : r;
+       }
+       return 0;
+}
+
 /*
  * We are only allowed to write into/dirty the page if the page is
  * clean, or already dirty within the same snap context.
@@ -1312,75 +1227,47 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
 {
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_snap_context *snapc;
        struct page *page = NULL;
        pgoff_t index = pos >> PAGE_SHIFT;
-       int pos_in_page = pos & ~PAGE_MASK;
-       int r = 0;
+       int r;
 
-       dout("write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len);
-
-       for (;;) {
+       /*
+        * Uninlining should have already been done and everything updated, EXCEPT
+        * for inline_version sent to the MDS.
+        */
+       if (ci->i_inline_version != CEPH_INLINE_NONE) {
                page = grab_cache_page_write_begin(mapping, index, flags);
-               if (!page) {
-                       r = -ENOMEM;
-                       break;
-               }
-
-               snapc = ceph_find_incompatible(page);
-               if (snapc) {
-                       if (IS_ERR(snapc)) {
-                               r = PTR_ERR(snapc);
-                               break;
-                       }
-                       unlock_page(page);
-                       put_page(page);
-                       page = NULL;
-                       ceph_queue_writeback(inode);
-                       r = wait_event_killable(ci->i_cap_wq,
-                                               context_is_writeable_or_written(inode, snapc));
-                       ceph_put_snap_context(snapc);
-                       if (r != 0)
-                               break;
-                       continue;
-               }
-
-               if (PageUptodate(page)) {
-                       dout(" page %p already uptodate\n", page);
-                       break;
-               }
+               if (!page)
+                       return -ENOMEM;
 
                /*
-                * In some cases we don't need to read at all:
-                * - full page write
-                * - write that lies completely beyond EOF
-                * - write that covers the the page from start to EOF or beyond it
+                * The inline_version on a new inode is set to 1. If that's the
+                * case, then the page is brand new and isn't yet Uptodate.
                 */
-               if ((pos_in_page == 0 && len == PAGE_SIZE) ||
-                   (pos >= i_size_read(inode)) ||
-                   (pos_in_page == 0 && (pos + len) >= i_size_read(inode))) {
-                       zero_user_segments(page, 0, pos_in_page,
-                                          pos_in_page + len, PAGE_SIZE);
-                       break;
+               r = 0;
+               if (index == 0 && ci->i_inline_version != 1) {
+                       if (!PageUptodate(page)) {
+                               WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n",
+                                         ci->i_inline_version);
+                               r = -EINVAL;
+                       }
+                       goto out;
                }
-
-               /*
-                * We need to read it. If we get back -EINPROGRESS, then the page was
-                * handed off to fscache and it will be unlocked when the read completes.
-                * Refind the page in that case so we can reacquire the page lock. Otherwise
-                * we got a hard error or the read was completed synchronously.
-                */
-               r = ceph_do_readpage(file, page);
-               if (r != -EINPROGRESS)
-                       break;
+               zero_user_segment(page, 0, thp_size(page));
+               SetPageUptodate(page);
+               goto out;
        }
 
+       r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &page, NULL,
+                             &ceph_netfs_read_ops, NULL);
+out:
+       if (r == 0)
+               wait_on_page_fscache(page);
        if (r < 0) {
-               if (page) {
-                       unlock_page(page);
+               if (page)
                        put_page(page);
-               }
        } else {
+               WARN_ON_ONCE(!PageLocked(page));
                *pagep = page;
        }
        return r;
@@ -1438,7 +1325,7 @@ static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter)
 
 const struct address_space_operations ceph_aops = {
        .readpage = ceph_readpage,
-       .readpages = ceph_readpages,
+       .readahead = ceph_readahead,
        .writepage = ceph_writepage,
        .writepages = ceph_writepages_start,
        .write_begin = ceph_write_begin,
@@ -1470,7 +1357,6 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
        struct inode *inode = file_inode(vma->vm_file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_file_info *fi = vma->vm_file->private_data;
-       struct page *pinned_page = NULL;
        loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT;
        int want, got, err;
        sigset_t oldset;
@@ -1478,21 +1364,20 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
 
        ceph_block_sigs(&oldset);
 
-       dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
-            inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE);
+       dout("filemap_fault %p %llx.%llx %llu trying to get caps\n",
+            inode, ceph_vinop(inode), off);
        if (fi->fmode & CEPH_FILE_MODE_LAZY)
                want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
        else
                want = CEPH_CAP_FILE_CACHE;
 
        got = 0;
-       err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1,
-                           &got, &pinned_page);
+       err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1, &got);
        if (err < 0)
                goto out_restore;
 
-       dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
-            inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
+       dout("filemap_fault %p %llu got cap refs on %s\n",
+            inode, off, ceph_cap_string(got));
 
        if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
            ci->i_inline_version == CEPH_INLINE_NONE) {
@@ -1500,14 +1385,11 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
                ceph_add_rw_context(fi, &rw_ctx);
                ret = filemap_fault(vmf);
                ceph_del_rw_context(fi, &rw_ctx);
-               dout("filemap_fault %p %llu~%zd drop cap refs %s ret %x\n",
-                       inode, off, (size_t)PAGE_SIZE,
-                               ceph_cap_string(got), ret);
+               dout("filemap_fault %p %llu drop cap refs %s ret %x\n",
+                    inode, off, ceph_cap_string(got), ret);
        } else
                err = -EAGAIN;
 
-       if (pinned_page)
-               put_page(pinned_page);
        ceph_put_cap_refs(ci, got);
 
        if (err != -EAGAIN)
@@ -1542,8 +1424,8 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
                vmf->page = page;
                ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
 out_inline:
-               dout("filemap_fault %p %llu~%zd read inline data ret %x\n",
-                    inode, off, (size_t)PAGE_SIZE, ret);
+               dout("filemap_fault %p %llu read inline data ret %x\n",
+                    inode, off, ret);
        }
 out_restore:
        ceph_restore_sigs(&oldset);
@@ -1553,9 +1435,6 @@ out_restore:
        return ret;
 }
 
-/*
- * Reuse write_begin here for simplicity.
- */
 static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 {
        struct vm_area_struct *vma = vmf->vma;
@@ -1591,10 +1470,10 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
                        goto out_free;
        }
 
-       if (off + PAGE_SIZE <= size)
-               len = PAGE_SIZE;
+       if (off + thp_size(page) <= size)
+               len = thp_size(page);
        else
-               len = size & ~PAGE_MASK;
+               len = offset_in_thp(page, size);
 
        dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
             inode, ceph_vinop(inode), off, len, size);
@@ -1604,8 +1483,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
                want = CEPH_CAP_FILE_BUFFER;
 
        got = 0;
-       err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len,
-                           &got, NULL);
+       err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got);
        if (err < 0)
                goto out_free;
 
@@ -1832,7 +1710,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
        if (!err)
                err = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
-       ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+       ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
                                  req->r_end_latency, err);
 
 out_put:
@@ -2057,6 +1935,10 @@ int ceph_pool_perm_check(struct inode *inode, int need)
        s64 pool;
        int ret, flags;
 
+       /* Only need to do this for regular files */
+       if (!S_ISREG(inode->i_mode))
+               return 0;
+
        if (ci->i_vino.snap != CEPH_NOSNAP) {
                /*
                 * Pool permission check needs to write to the first object.