return !PagePrivate(page);
}
-/*
- * Read some contiguous pages. If we cross a stripe boundary, shorten
- * *plen. Return number of bytes read, or error.
- */
-static int ceph_sync_readpages(struct ceph_fs_client *fsc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- u64 off, u64 *plen,
- u32 truncate_seq, u64 truncate_size,
- struct page **pages, int num_pages,
- int page_align)
-{
- struct ceph_osd_client *osdc = &fsc->client->osdc;
- struct ceph_osd_request *req;
- int rc = 0;
-
- dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
- vino.snap, off, *plen);
- req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
- CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
- NULL, truncate_seq, truncate_size,
- false);
- if (IS_ERR(req))
- return PTR_ERR(req);
-
- /* it may be a short read due to an object boundary */
- osd_req_op_extent_osd_data_pages(req, 0,
- pages, *plen, page_align, false, false);
-
- dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n",
- off, *plen, *plen, page_align);
-
- rc = ceph_osdc_start_request(osdc, req, false);
- if (!rc)
- rc = ceph_osdc_wait_request(osdc, req);
-
- ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
- req->r_end_latency, rc);
-
- ceph_osdc_put_request(req);
- dout("readpages result %d\n", rc);
- return rc;
-}
-
-/*
- * read a single page, without unlocking it.
- */
+/* read a single page, without unlocking it. */
static int ceph_do_readpage(struct file *filp, struct page *page)
{
struct inode *inode = file_inode(filp);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+ struct ceph_osd_client *osdc = &fsc->client->osdc;
+ struct ceph_osd_request *req;
+ struct ceph_vino vino = ceph_vino(inode);
int err = 0;
u64 off = page_offset(page);
u64 len = PAGE_SIZE;
if (err == 0)
return -EINPROGRESS;
- dout("readpage inode %p file %p page %p index %lu\n",
- inode, filp, page, page->index);
- err = ceph_sync_readpages(fsc, ceph_vino(inode),
- &ci->i_layout, off, &len,
- ci->i_truncate_seq, ci->i_truncate_size,
- &page, 1, 0);
+ dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
+ vino.ino, vino.snap, filp, off, len, page, page->index);
+ req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 0, 1,
+ CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL,
+ ci->i_truncate_seq, ci->i_truncate_size,
+ false);
+ if (IS_ERR(req))
+ return PTR_ERR(req);
+
+ osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
+
+ err = ceph_osdc_start_request(osdc, req, false);
+ if (!err)
+ err = ceph_osdc_wait_request(osdc, req);
+
+ ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, err);
+
+ ceph_osdc_put_request(req);
+ dout("readpage result %d\n", err);
+
if (err == -ENOENT)
err = 0;
if (err < 0) {
- SetPageError(page);
ceph_fscache_readpage_cancel(inode, page);
if (err == -EBLOCKLISTED)
fsc->blocklisted = true;
return end > start ? end - start : 0;
}
-/*
- * do a synchronous write on N pages
- */
-static int ceph_sync_writepages(struct ceph_fs_client *fsc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- struct ceph_snap_context *snapc,
- u64 off, u64 len,
- u32 truncate_seq, u64 truncate_size,
- struct timespec64 *mtime,
- struct page **pages, int num_pages)
-{
- struct ceph_osd_client *osdc = &fsc->client->osdc;
- struct ceph_osd_request *req;
- int rc = 0;
- int page_align = off & ~PAGE_MASK;
-
- req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
- CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
- snapc, truncate_seq, truncate_size,
- true);
- if (IS_ERR(req))
- return PTR_ERR(req);
-
- /* it may be a short write due to an object boundary */
- osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
- false, false);
- dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
-
- req->r_mtime = *mtime;
- rc = ceph_osdc_start_request(osdc, req, true);
- if (!rc)
- rc = ceph_osdc_wait_request(osdc, req);
-
- ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
- req->r_end_latency, rc);
-
- ceph_osdc_put_request(req);
- if (rc == 0)
- rc = len;
- dout("writepages result %d\n", rc);
- return rc;
-}
-
/*
* Write a single page, but leave the page locked.
*
*/
static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
{
- struct inode *inode;
- struct ceph_inode_info *ci;
- struct ceph_fs_client *fsc;
+ struct inode *inode = page->mapping->host;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_snap_context *snapc, *oldest;
loff_t page_off = page_offset(page);
- int err, len = PAGE_SIZE;
+ int err;
+ loff_t len = PAGE_SIZE;
struct ceph_writeback_ctl ceph_wbc;
+ struct ceph_osd_client *osdc = &fsc->client->osdc;
+ struct ceph_osd_request *req;
dout("writepage %p idx %lu\n", page, page->index);
- inode = page->mapping->host;
- ci = ceph_inode(inode);
- fsc = ceph_inode_to_client(inode);
-
/* verify this is a writeable snap context */
snapc = page_snap_context(page);
if (!snapc) {
if (ceph_wbc.i_size < page_off + len)
len = ceph_wbc.i_size - page_off;
- dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
+ dout("writepage %p page %p index %lu on %llu~%llu snapc %p seq %lld\n",
inode, page, page->index, page_off, len, snapc, snapc->seq);
if (atomic_long_inc_return(&fsc->writeback_count) >
set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
set_page_writeback(page);
- err = ceph_sync_writepages(fsc, ceph_vino(inode),
- &ci->i_layout, snapc, page_off, len,
- ceph_wbc.truncate_seq,
- ceph_wbc.truncate_size,
- &inode->i_mtime, &page, 1);
+ req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), page_off, &len, 0, 1,
+ CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc,
+ ceph_wbc.truncate_seq, ceph_wbc.truncate_size,
+ true);
+ if (IS_ERR(req)) {
+ redirty_page_for_writepage(wbc, page);
+ end_page_writeback(page);
+ return PTR_ERR(req);
+ }
+
+ /* it may be a short write due to an object boundary */
+ WARN_ON_ONCE(len > PAGE_SIZE);
+ osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
+ dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);
+
+ req->r_mtime = inode->i_mtime;
+ err = ceph_osdc_start_request(osdc, req, true);
+ if (!err)
+ err = ceph_osdc_wait_request(osdc, req);
+
+ ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+ req->r_end_latency, err);
+
+ ceph_osdc_put_request(req);
+ if (err == 0)
+ err = len;
+
if (err < 0) {
struct writeback_control tmp_wbc;
if (!wbc)
/**
* ceph_find_incompatible - find an incompatible context and return it
- * @inode: inode associated with page
* @page: page being dirtied
*
* We are only allowed to write into/dirty a page if the page is
* Must be called with page lock held.
*/
static struct ceph_snap_context *
-ceph_find_incompatible(struct inode *inode, struct page *page)
+ceph_find_incompatible(struct page *page)
{
+ struct inode *inode = page->mapping->host;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_inode_info *ci = ceph_inode(inode);
/*
* We are only allowed to write into/dirty the page if the page is
* clean, or already dirty within the same snap context.
- *
- * called with page locked.
- * return success with page locked,
- * or any failure (incl -EAGAIN) with page unlocked.
*/
-static int ceph_update_writeable_page(struct file *file,
- loff_t pos, unsigned len,
- struct page *page)
+static int ceph_write_begin(struct file *file, struct address_space *mapping,
+ loff_t pos, unsigned len, unsigned flags,
+ struct page **pagep, void **fsdata)
{
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_snap_context *snapc;
- loff_t page_off = pos & PAGE_MASK;
+ struct page *page = NULL;
+ pgoff_t index = pos >> PAGE_SHIFT;
int pos_in_page = pos & ~PAGE_MASK;
- int end_in_page = pos_in_page + len;
- loff_t i_size;
- int r;
+ int r = 0;
-retry_locked:
- snapc = ceph_find_incompatible(inode, page);
- if (snapc) {
- if (IS_ERR(snapc)) {
- r = PTR_ERR(snapc);
- goto fail_unlock;
+ dout("write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len);
+
+ for (;;) {
+ page = grab_cache_page_write_begin(mapping, index, 0);
+ if (!page) {
+ r = -ENOMEM;
+ break;
}
- unlock_page(page);
- ceph_queue_writeback(inode);
- r = wait_event_killable(ci->i_cap_wq,
- context_is_writeable_or_written(inode, snapc));
- ceph_put_snap_context(snapc);
- return -EAGAIN;
- }
- if (PageUptodate(page)) {
- dout(" page %p already uptodate\n", page);
- return 0;
- }
+ snapc = ceph_find_incompatible(page);
+ if (snapc) {
+ if (IS_ERR(snapc)) {
+ r = PTR_ERR(snapc);
+ break;
+ }
+ unlock_page(page);
+ put_page(page);
+ page = NULL;
+ ceph_queue_writeback(inode);
+ r = wait_event_killable(ci->i_cap_wq,
+ context_is_writeable_or_written(inode, snapc));
+ ceph_put_snap_context(snapc);
+ if (r != 0)
+ break;
+ continue;
+ }
- /* full page? */
- if (pos_in_page == 0 && len == PAGE_SIZE)
- return 0;
+ if (PageUptodate(page)) {
+ dout(" page %p already uptodate\n", page);
+ break;
+ }
- /* past end of file? */
- i_size = i_size_read(inode);
-
- if (page_off >= i_size ||
- (pos_in_page == 0 && (pos+len) >= i_size &&
- end_in_page - pos_in_page != PAGE_SIZE)) {
- dout(" zeroing %p 0 - %d and %d - %d\n",
- page, pos_in_page, end_in_page, (int)PAGE_SIZE);
- zero_user_segments(page,
- 0, pos_in_page,
- end_in_page, PAGE_SIZE);
- return 0;
- }
+ /*
+ * In some cases we don't need to read at all:
+ * - full page write
+ * - write that lies completely beyond EOF
+ * - write that covers the the page from start to EOF or beyond it
+ */
+ if ((pos_in_page == 0 && len == PAGE_SIZE) ||
+ (pos >= i_size_read(inode)) ||
+ (pos_in_page == 0 && (pos + len) >= i_size_read(inode))) {
+ zero_user_segments(page, 0, pos_in_page,
+ pos_in_page + len, PAGE_SIZE);
+ break;
+ }
- /* we need to read it. */
- r = ceph_do_readpage(file, page);
- if (r < 0) {
- if (r == -EINPROGRESS)
- return -EAGAIN;
- goto fail_unlock;
+ /*
+ * We need to read it. If we get back -EINPROGRESS, then the page was
+ * handed off to fscache and it will be unlocked when the read completes.
+ * Refind the page in that case so we can reacquire the page lock. Otherwise
+ * we got a hard error or the read was completed synchronously.
+ */
+ r = ceph_do_readpage(file, page);
+ if (r != -EINPROGRESS)
+ break;
}
- goto retry_locked;
-fail_unlock:
- unlock_page(page);
- return r;
-}
-
-/*
- * We are only allowed to write into/dirty the page if the page is
- * clean, or already dirty within the same snap context.
- */
-static int ceph_write_begin(struct file *file, struct address_space *mapping,
- loff_t pos, unsigned len, unsigned flags,
- struct page **pagep, void **fsdata)
-{
- struct inode *inode = file_inode(file);
- struct page *page;
- pgoff_t index = pos >> PAGE_SHIFT;
- int r;
-
- do {
- /* get a page */
- page = grab_cache_page_write_begin(mapping, index, 0);
- if (!page)
- return -ENOMEM;
-
- dout("write_begin file %p inode %p page %p %d~%d\n", file,
- inode, page, (int)pos, (int)len);
- r = ceph_update_writeable_page(file, pos, len, page);
- if (r < 0)
+ if (r < 0) {
+ if (page) {
+ unlock_page(page);
put_page(page);
- else
- *pagep = page;
- } while (r == -EAGAIN);
-
+ }
+ } else {
+ *pagep = page;
+ }
return r;
}
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = vma->vm_file->private_data;
struct page *pinned_page = NULL;
- loff_t off = vmf->pgoff << PAGE_SHIFT;
+ loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT;
int want, got, err;
sigset_t oldset;
vm_fault_t ret = VM_FAULT_SIGBUS;
inode_inc_iversion_raw(inode);
do {
+ struct ceph_snap_context *snapc;
+
lock_page(page);
if (page_mkwrite_check_truncate(page, inode) < 0) {
break;
}
- err = ceph_update_writeable_page(vma->vm_file, off, len, page);
- if (err >= 0) {
+ snapc = ceph_find_incompatible(page);
+ if (!snapc) {
/* success. we'll keep the page locked. */
set_page_dirty(page);
ret = VM_FAULT_LOCKED;
+ break;
+ }
+
+ unlock_page(page);
+
+ if (IS_ERR(snapc)) {
+ ret = VM_FAULT_SIGBUS;
+ break;
}
- } while (err == -EAGAIN);
+
+ ceph_queue_writeback(inode);
+ err = wait_event_killable(ci->i_cap_wq,
+ context_is_writeable_or_written(inode, snapc));
+ ceph_put_snap_context(snapc);
+ } while (err == 0);
if (ret == VM_FAULT_LOCKED ||
ci->i_inline_version != CEPH_INLINE_NONE) {