Merge tag 'irq-urgent-2020-11-08' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux-2.6-microblaze.git] / fs / ceph / addr.c
index 6ea761c..35c83f6 100644 (file)
@@ -182,58 +182,15 @@ static int ceph_releasepage(struct page *page, gfp_t g)
        return !PagePrivate(page);
 }
 
-/*
- * Read some contiguous pages.  If we cross a stripe boundary, shorten
- * *plen.  Return number of bytes read, or error.
- */
-static int ceph_sync_readpages(struct ceph_fs_client *fsc,
-                              struct ceph_vino vino,
-                              struct ceph_file_layout *layout,
-                              u64 off, u64 *plen,
-                              u32 truncate_seq, u64 truncate_size,
-                              struct page **pages, int num_pages,
-                              int page_align)
-{
-       struct ceph_osd_client *osdc = &fsc->client->osdc;
-       struct ceph_osd_request *req;
-       int rc = 0;
-
-       dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
-            vino.snap, off, *plen);
-       req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
-                                   CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
-                                   NULL, truncate_seq, truncate_size,
-                                   false);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
-
-       /* it may be a short read due to an object boundary */
-       osd_req_op_extent_osd_data_pages(req, 0,
-                               pages, *plen, page_align, false, false);
-
-       dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
-            off, *plen, *plen, page_align);
-
-       rc = ceph_osdc_start_request(osdc, req, false);
-       if (!rc)
-               rc = ceph_osdc_wait_request(osdc, req);
-
-       ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
-                                req->r_end_latency, rc);
-
-       ceph_osdc_put_request(req);
-       dout("readpages result %d\n", rc);
-       return rc;
-}
-
-/*
- * read a single page, without unlocking it.
- */
+/* read a single page, without unlocking it. */
 static int ceph_do_readpage(struct file *filp, struct page *page)
 {
        struct inode *inode = file_inode(filp);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_osd_client *osdc = &fsc->client->osdc;
+       struct ceph_osd_request *req;
+       struct ceph_vino vino = ceph_vino(inode);
        int err = 0;
        u64 off = page_offset(page);
        u64 len = PAGE_SIZE;
@@ -260,19 +217,33 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
        if (err == 0)
                return -EINPROGRESS;
 
-       dout("readpage inode %p file %p page %p index %lu\n",
-            inode, filp, page, page->index);
-       err = ceph_sync_readpages(fsc, ceph_vino(inode),
-                                 &ci->i_layout, off, &len,
-                                 ci->i_truncate_seq, ci->i_truncate_size,
-                                 &page, 1, 0);
+       dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
+            vino.ino, vino.snap, filp, off, len, page, page->index);
+       req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 0, 1,
+                                   CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL,
+                                   ci->i_truncate_seq, ci->i_truncate_size,
+                                   false);
+       if (IS_ERR(req))
+               return PTR_ERR(req);
+
+       osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
+
+       err = ceph_osdc_start_request(osdc, req, false);
+       if (!err)
+               err = ceph_osdc_wait_request(osdc, req);
+
+       ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
+                                req->r_end_latency, err);
+
+       ceph_osdc_put_request(req);
+       dout("readpage result %d\n", err);
+
        if (err == -ENOENT)
                err = 0;
        if (err < 0) {
-               SetPageError(page);
                ceph_fscache_readpage_cancel(inode, page);
-               if (err == -EBLACKLISTED)
-                       fsc->blacklisted = true;
+               if (err == -EBLOCKLISTED)
+                       fsc->blocklisted = true;
                goto out;
        }
        if (err < PAGE_SIZE)
@@ -312,8 +283,8 @@ static void finish_read(struct ceph_osd_request *req)
        int i;
 
        dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
-       if (rc == -EBLACKLISTED)
-               ceph_inode_to_client(inode)->blacklisted = true;
+       if (rc == -EBLOCKLISTED)
+               ceph_inode_to_client(inode)->blocklisted = true;
 
        /* unlock all pages, zeroing any data we didn't read */
        osd_data = osd_req_op_extent_osd_data(req, 0);
@@ -619,50 +590,6 @@ static u64 get_writepages_data_length(struct inode *inode,
        return end > start ? end - start : 0;
 }
 
-/*
- * do a synchronous write on N pages
- */
-static int ceph_sync_writepages(struct ceph_fs_client *fsc,
-                               struct ceph_vino vino,
-                               struct ceph_file_layout *layout,
-                               struct ceph_snap_context *snapc,
-                               u64 off, u64 len,
-                               u32 truncate_seq, u64 truncate_size,
-                               struct timespec64 *mtime,
-                               struct page **pages, int num_pages)
-{
-       struct ceph_osd_client *osdc = &fsc->client->osdc;
-       struct ceph_osd_request *req;
-       int rc = 0;
-       int page_align = off & ~PAGE_MASK;
-
-       req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
-                                   CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
-                                   snapc, truncate_seq, truncate_size,
-                                   true);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
-
-       /* it may be a short write due to an object boundary */
-       osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
-                               false, false);
-       dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
-
-       req->r_mtime = *mtime;
-       rc = ceph_osdc_start_request(osdc, req, true);
-       if (!rc)
-               rc = ceph_osdc_wait_request(osdc, req);
-
-       ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
-                                 req->r_end_latency, rc);
-
-       ceph_osdc_put_request(req);
-       if (rc == 0)
-               rc = len;
-       dout("writepages result %d\n", rc);
-       return rc;
-}
-
 /*
  * Write a single page, but leave the page locked.
  *
@@ -671,20 +598,19 @@ static int ceph_sync_writepages(struct ceph_fs_client *fsc,
  */
 static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 {
-       struct inode *inode;
-       struct ceph_inode_info *ci;
-       struct ceph_fs_client *fsc;
+       struct inode *inode = page->mapping->host;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_snap_context *snapc, *oldest;
        loff_t page_off = page_offset(page);
-       int err, len = PAGE_SIZE;
+       int err;
+       loff_t len = PAGE_SIZE;
        struct ceph_writeback_ctl ceph_wbc;
+       struct ceph_osd_client *osdc = &fsc->client->osdc;
+       struct ceph_osd_request *req;
 
        dout("writepage %p idx %lu\n", page, page->index);
 
-       inode = page->mapping->host;
-       ci = ceph_inode(inode);
-       fsc = ceph_inode_to_client(inode);
-
        /* verify this is a writeable snap context */
        snapc = page_snap_context(page);
        if (!snapc) {
@@ -713,7 +639,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        if (ceph_wbc.i_size < page_off + len)
                len = ceph_wbc.i_size - page_off;
 
-       dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
+       dout("writepage %p page %p index %lu on %llu~%llu snapc %p seq %lld\n",
             inode, page, page->index, page_off, len, snapc, snapc->seq);
 
        if (atomic_long_inc_return(&fsc->writeback_count) >
@@ -721,11 +647,33 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
                set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
 
        set_page_writeback(page);
-       err = ceph_sync_writepages(fsc, ceph_vino(inode),
-                                  &ci->i_layout, snapc, page_off, len,
-                                  ceph_wbc.truncate_seq,
-                                  ceph_wbc.truncate_size,
-                                  &inode->i_mtime, &page, 1);
+       req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), page_off, &len, 0, 1,
+                                   CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc,
+                                   ceph_wbc.truncate_seq, ceph_wbc.truncate_size,
+                                   true);
+       if (IS_ERR(req)) {
+               redirty_page_for_writepage(wbc, page);
+               end_page_writeback(page);
+               return PTR_ERR(req);
+       }
+
+       /* it may be a short write due to an object boundary */
+       WARN_ON_ONCE(len > PAGE_SIZE);
+       osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
+       dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);
+
+       req->r_mtime = inode->i_mtime;
+       err = ceph_osdc_start_request(osdc, req, true);
+       if (!err)
+               err = ceph_osdc_wait_request(osdc, req);
+
+       ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+                                 req->r_end_latency, err);
+
+       ceph_osdc_put_request(req);
+       if (err == 0)
+               err = len;
+
        if (err < 0) {
                struct writeback_control tmp_wbc;
                if (!wbc)
@@ -737,8 +685,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
                        end_page_writeback(page);
                        return err;
                }
-               if (err == -EBLACKLISTED)
-                       fsc->blacklisted = true;
+               if (err == -EBLOCKLISTED)
+                       fsc->blocklisted = true;
                dout("writepage setting page/mapping error %d %p\n",
                     err, page);
                mapping_set_error(&inode->i_data, err);
@@ -801,8 +749,8 @@ static void writepages_finish(struct ceph_osd_request *req)
        if (rc < 0) {
                mapping_set_error(mapping, rc);
                ceph_set_error_write(ci);
-               if (rc == -EBLACKLISTED)
-                       fsc->blacklisted = true;
+               if (rc == -EBLOCKLISTED)
+                       fsc->blocklisted = true;
        } else {
                ceph_clear_error_write(ci);
        }
@@ -962,9 +910,8 @@ retry:
                max_pages = wsize >> PAGE_SHIFT;
 
 get_more_pages:
-               pvec_pages = pagevec_lookup_range_nr_tag(&pvec, mapping, &index,
-                                               end, PAGECACHE_TAG_DIRTY,
-                                               max_pages - locked_pages);
+               pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
+                                               end, PAGECACHE_TAG_DIRTY);
                dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
                if (!pvec_pages && !locked_pages)
                        break;
@@ -1299,110 +1246,60 @@ static int context_is_writeable_or_written(struct inode *inode,
        return ret;
 }
 
-/*
- * We are only allowed to write into/dirty the page if the page is
- * clean, or already dirty within the same snap context.
+/**
+ * ceph_find_incompatible - find an incompatible context and return it
+ * @page: page being dirtied
+ *
+ * We are only allowed to write into/dirty a page if the page is
+ * clean, or already dirty within the same snap context. Returns a
+ * conflicting context if there is one, NULL if there isn't, or a
+ * negative error code on other errors.
  *
- * called with page locked.
- * return success with page locked,
- * or any failure (incl -EAGAIN) with page unlocked.
+ * Must be called with page lock held.
  */
-static int ceph_update_writeable_page(struct file *file,
-                           loff_t pos, unsigned len,
-                           struct page *page)
+static struct ceph_snap_context *
+ceph_find_incompatible(struct page *page)
 {
-       struct inode *inode = file_inode(file);
+       struct inode *inode = page->mapping->host;
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_inode_info *ci = ceph_inode(inode);
-       loff_t page_off = pos & PAGE_MASK;
-       int pos_in_page = pos & ~PAGE_MASK;
-       int end_in_page = pos_in_page + len;
-       loff_t i_size;
-       int r;
-       struct ceph_snap_context *snapc, *oldest;
 
        if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
                dout(" page %p forced umount\n", page);
-               unlock_page(page);
-               return -EIO;
+               return ERR_PTR(-EIO);
        }
 
-retry_locked:
-       /* writepages currently holds page lock, but if we change that later, */
-       wait_on_page_writeback(page);
+       for (;;) {
+               struct ceph_snap_context *snapc, *oldest;
+
+               wait_on_page_writeback(page);
+
+               snapc = page_snap_context(page);
+               if (!snapc || snapc == ci->i_head_snapc)
+                       break;
 
-       snapc = page_snap_context(page);
-       if (snapc && snapc != ci->i_head_snapc) {
                /*
                 * this page is already dirty in another (older) snap
                 * context!  is it writeable now?
                 */
                oldest = get_oldest_context(inode, NULL, NULL);
                if (snapc->seq > oldest->seq) {
+                       /* not writeable -- return it for the caller to deal with */
                        ceph_put_snap_context(oldest);
-                       dout(" page %p snapc %p not current or oldest\n",
-                            page, snapc);
-                       /*
-                        * queue for writeback, and wait for snapc to
-                        * be writeable or written
-                        */
-                       snapc = ceph_get_snap_context(snapc);
-                       unlock_page(page);
-                       ceph_queue_writeback(inode);
-                       r = wait_event_killable(ci->i_cap_wq,
-                              context_is_writeable_or_written(inode, snapc));
-                       ceph_put_snap_context(snapc);
-                       if (r == -ERESTARTSYS)
-                               return r;
-                       return -EAGAIN;
+                       dout(" page %p snapc %p not current or oldest\n", page, snapc);
+                       return ceph_get_snap_context(snapc);
                }
                ceph_put_snap_context(oldest);
 
                /* yay, writeable, do it now (without dropping page lock) */
-               dout(" page %p snapc %p not current, but oldest\n",
-                    page, snapc);
-               if (!clear_page_dirty_for_io(page))
-                       goto retry_locked;
-               r = writepage_nounlock(page, NULL);
-               if (r < 0)
-                       goto fail_unlock;
-               goto retry_locked;
-       }
-
-       if (PageUptodate(page)) {
-               dout(" page %p already uptodate\n", page);
-               return 0;
-       }
-
-       /* full page? */
-       if (pos_in_page == 0 && len == PAGE_SIZE)
-               return 0;
-
-       /* past end of file? */
-       i_size = i_size_read(inode);
-
-       if (page_off >= i_size ||
-           (pos_in_page == 0 && (pos+len) >= i_size &&
-            end_in_page - pos_in_page != PAGE_SIZE)) {
-               dout(" zeroing %p 0 - %d and %d - %d\n",
-                    page, pos_in_page, end_in_page, (int)PAGE_SIZE);
-               zero_user_segments(page,
-                                  0, pos_in_page,
-                                  end_in_page, PAGE_SIZE);
-               return 0;
-       }
-
-       /* we need to read it. */
-       r = ceph_do_readpage(file, page);
-       if (r < 0) {
-               if (r == -EINPROGRESS)
-                       return -EAGAIN;
-               goto fail_unlock;
+               dout(" page %p snapc %p not current, but oldest\n", page, snapc);
+               if (clear_page_dirty_for_io(page)) {
+                       int r = writepage_nounlock(page, NULL);
+                       if (r < 0)
+                               return ERR_PTR(r);
+               }
        }
-       goto retry_locked;
-fail_unlock:
-       unlock_page(page);
-       return r;
+       return NULL;
 }
 
 /*
@@ -1414,26 +1311,78 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
                            struct page **pagep, void **fsdata)
 {
        struct inode *inode = file_inode(file);
-       struct page *page;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_snap_context *snapc;
+       struct page *page = NULL;
        pgoff_t index = pos >> PAGE_SHIFT;
-       int r;
+       int pos_in_page = pos & ~PAGE_MASK;
+       int r = 0;
 
-       do {
-               /* get a page */
-               page = grab_cache_page_write_begin(mapping, index, 0);
-               if (!page)
-                       return -ENOMEM;
+       dout("write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len);
 
-               dout("write_begin file %p inode %p page %p %d~%d\n", file,
-                    inode, page, (int)pos, (int)len);
+       for (;;) {
+               page = grab_cache_page_write_begin(mapping, index, 0);
+               if (!page) {
+                       r = -ENOMEM;
+                       break;
+               }
 
-               r = ceph_update_writeable_page(file, pos, len, page);
-               if (r < 0)
+               snapc = ceph_find_incompatible(page);
+               if (snapc) {
+                       if (IS_ERR(snapc)) {
+                               r = PTR_ERR(snapc);
+                               break;
+                       }
+                       unlock_page(page);
                        put_page(page);
-               else
-                       *pagep = page;
-       } while (r == -EAGAIN);
+                       page = NULL;
+                       ceph_queue_writeback(inode);
+                       r = wait_event_killable(ci->i_cap_wq,
+                                               context_is_writeable_or_written(inode, snapc));
+                       ceph_put_snap_context(snapc);
+                       if (r != 0)
+                               break;
+                       continue;
+               }
+
+               if (PageUptodate(page)) {
+                       dout(" page %p already uptodate\n", page);
+                       break;
+               }
+
+               /*
+                * In some cases we don't need to read at all:
+                * - full page write
+                * - write that lies completely beyond EOF
+                * - write that covers the the page from start to EOF or beyond it
+                */
+               if ((pos_in_page == 0 && len == PAGE_SIZE) ||
+                   (pos >= i_size_read(inode)) ||
+                   (pos_in_page == 0 && (pos + len) >= i_size_read(inode))) {
+                       zero_user_segments(page, 0, pos_in_page,
+                                          pos_in_page + len, PAGE_SIZE);
+                       break;
+               }
 
+               /*
+                * We need to read it. If we get back -EINPROGRESS, then the page was
+                * handed off to fscache and it will be unlocked when the read completes.
+                * Refind the page in that case so we can reacquire the page lock. Otherwise
+                * we got a hard error or the read was completed synchronously.
+                */
+               r = ceph_do_readpage(file, page);
+               if (r != -EINPROGRESS)
+                       break;
+       }
+
+       if (r < 0) {
+               if (page) {
+                       unlock_page(page);
+                       put_page(page);
+               }
+       } else {
+               *pagep = page;
+       }
        return r;
 }
 
@@ -1522,7 +1471,7 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_file_info *fi = vma->vm_file->private_data;
        struct page *pinned_page = NULL;
-       loff_t off = vmf->pgoff << PAGE_SHIFT;
+       loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT;
        int want, got, err;
        sigset_t oldset;
        vm_fault_t ret = VM_FAULT_SIGBUS;
@@ -1668,6 +1617,8 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
        inode_inc_iversion_raw(inode);
 
        do {
+               struct ceph_snap_context *snapc;
+
                lock_page(page);
 
                if (page_mkwrite_check_truncate(page, inode) < 0) {
@@ -1676,13 +1627,26 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
                        break;
                }
 
-               err = ceph_update_writeable_page(vma->vm_file, off, len, page);
-               if (err >= 0) {
+               snapc = ceph_find_incompatible(page);
+               if (!snapc) {
                        /* success.  we'll keep the page locked. */
                        set_page_dirty(page);
                        ret = VM_FAULT_LOCKED;
+                       break;
                }
-       } while (err == -EAGAIN);
+
+               unlock_page(page);
+
+               if (IS_ERR(snapc)) {
+                       ret = VM_FAULT_SIGBUS;
+                       break;
+               }
+
+               ceph_queue_writeback(inode);
+               err = wait_event_killable(ci->i_cap_wq,
+                               context_is_writeable_or_written(inode, snapc));
+               ceph_put_snap_context(snapc);
+       } while (err == 0);
 
        if (ret == VM_FAULT_LOCKED ||
            ci->i_inline_version != CEPH_INLINE_NONE) {
@@ -2039,16 +2003,16 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
        if (err >= 0 || err == -ENOENT)
                have |= POOL_READ;
        else if (err != -EPERM) {
-               if (err == -EBLACKLISTED)
-                       fsc->blacklisted = true;
+               if (err == -EBLOCKLISTED)
+                       fsc->blocklisted = true;
                goto out_unlock;
        }
 
        if (err2 == 0 || err2 == -EEXIST)
                have |= POOL_WRITE;
        else if (err2 != -EPERM) {
-               if (err2 == -EBLACKLISTED)
-                       fsc->blacklisted = true;
+               if (err2 == -EBLOCKLISTED)
+                       fsc->blocklisted = true;
                err = err2;
                goto out_unlock;
        }