Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Wed, 15 Oct 2014 04:46:01 +0000 (06:46 +0200)
committerLinus Torvalds <torvalds@linux-foundation.org>
Wed, 15 Oct 2014 04:46:01 +0000 (06:46 +0200)
Pull Ceph updates from Sage Weil:
 "There is the long-awaited discard support for RBD (Guangliang Zhao,
  Josh Durgin), a pile of RBD bug fixes that didn't belong in late -rc's
  (Ilya Dryomov, Li RongQing), a pile of fs/ceph bug fixes and
  performance and debugging improvements (Yan, Zheng, John Spray), and a
  smattering of cleanups (Chao Yu, Fabian Frederick, Joe Perches)"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits)
  ceph: fix divide-by-zero in __validate_layout()
  rbd: rbd workqueues need a resque worker
  libceph: ceph-msgr workqueue needs a resque worker
  ceph: fix bool assignments
  libceph: separate multiple ops with commas in debugfs output
  libceph: sync osd op definitions in rados.h
  libceph: remove redundant declaration
  ceph: additional debugfs output
  ceph: export ceph_session_state_name function
  ceph: include the initial ACL in create/mkdir/mknod MDS requests
  ceph: use pagelist to present MDS request data
  libceph: reference counting pagelist
  ceph: fix llistxattr on symlink
  ceph: send client metadata to MDS
  ceph: remove redundant code for max file size verification
  ceph: remove redundant io_iter_advance()
  ceph: move ceph_find_inode() outside the s_mutex
  ceph: request xattrs if xattr_version is zero
  rbd: set the remaining discard properties to enable support
  rbd: use helpers to handle discard for layered images correctly
  ...

25 files changed:
drivers/block/rbd.c
fs/ceph/acl.c
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/debugfs.c
fs/ceph/dir.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/ioctl.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h
fs/ceph/xattr.c
include/linux/ceph/libceph.h
include/linux/ceph/pagelist.h
include/linux/ceph/rados.h
net/ceph/Kconfig
net/ceph/ceph_common.c
net/ceph/ceph_strings.c
net/ceph/debugfs.c
net/ceph/messenger.c
net/ceph/mon_client.c
net/ceph/osd_client.c
net/ceph/osdmap.c
net/ceph/pagelist.c

index 4b97baf..0a54c58 100644 (file)
@@ -210,6 +210,12 @@ enum obj_request_type {
        OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
 };
 
+enum obj_operation_type {
+       OBJ_OP_WRITE,
+       OBJ_OP_READ,
+       OBJ_OP_DISCARD,
+};
+
 enum obj_req_flags {
        OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
        OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
@@ -276,6 +282,7 @@ enum img_req_flags {
        IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
        IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
        IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
+       IMG_REQ_DISCARD,        /* discard: normal = 0, discard request = 1 */
 };
 
 struct rbd_img_request {
@@ -785,6 +792,20 @@ static int parse_rbd_opts_token(char *c, void *private)
        return 0;
 }
 
+static char* obj_op_name(enum obj_operation_type op_type)
+{
+       switch (op_type) {
+       case OBJ_OP_READ:
+               return "read";
+       case OBJ_OP_WRITE:
+               return "write";
+       case OBJ_OP_DISCARD:
+               return "discard";
+       default:
+               return "???";
+       }
+}
+
 /*
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.  Either way, ceph_opts is consumed by this
@@ -1600,6 +1621,21 @@ static bool img_request_write_test(struct rbd_img_request *img_request)
        return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
 }
 
+/*
+ * Set the discard flag when the img_request is an discard request
+ */
+static void img_request_discard_set(struct rbd_img_request *img_request)
+{
+       set_bit(IMG_REQ_DISCARD, &img_request->flags);
+       smp_mb();
+}
+
+static bool img_request_discard_test(struct rbd_img_request *img_request)
+{
+       smp_mb();
+       return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
+}
+
 static void img_request_child_set(struct rbd_img_request *img_request)
 {
        set_bit(IMG_REQ_CHILD, &img_request->flags);
@@ -1636,6 +1672,17 @@ static bool img_request_layered_test(struct rbd_img_request *img_request)
        return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
 }
 
+static enum obj_operation_type
+rbd_img_request_op_type(struct rbd_img_request *img_request)
+{
+       if (img_request_write_test(img_request))
+               return OBJ_OP_WRITE;
+       else if (img_request_discard_test(img_request))
+               return OBJ_OP_DISCARD;
+       else
+               return OBJ_OP_READ;
+}
+
 static void
 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
 {
@@ -1722,6 +1769,21 @@ static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
        obj_request_done_set(obj_request);
 }
 
+static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
+{
+       dout("%s: obj %p result %d %llu\n", __func__, obj_request,
+               obj_request->result, obj_request->length);
+       /*
+        * There is no such thing as a successful short discard.  Set
+        * it to our originally-requested length.
+        */
+       obj_request->xferred = obj_request->length;
+       /* discarding a non-existent object is not a problem */
+       if (obj_request->result == -ENOENT)
+               obj_request->result = 0;
+       obj_request_done_set(obj_request);
+}
+
 /*
  * For a simple stat call there's nothing to do.  We'll do more if
  * this is part of a write sequence for a layered image.
@@ -1773,6 +1835,11 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
        case CEPH_OSD_OP_STAT:
                rbd_osd_stat_callback(obj_request);
                break;
+       case CEPH_OSD_OP_DELETE:
+       case CEPH_OSD_OP_TRUNCATE:
+       case CEPH_OSD_OP_ZERO:
+               rbd_osd_discard_callback(obj_request);
+               break;
        case CEPH_OSD_OP_CALL:
        case CEPH_OSD_OP_NOTIFY_ACK:
        case CEPH_OSD_OP_WATCH:
@@ -1823,7 +1890,7 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
  */
 static struct ceph_osd_request *rbd_osd_req_create(
                                        struct rbd_device *rbd_dev,
-                                       bool write_request,
+                                       enum obj_operation_type op_type,
                                        unsigned int num_ops,
                                        struct rbd_obj_request *obj_request)
 {
@@ -1831,16 +1898,18 @@ static struct ceph_osd_request *rbd_osd_req_create(
        struct ceph_osd_client *osdc;
        struct ceph_osd_request *osd_req;
 
-       if (obj_request_img_data_test(obj_request)) {
+       if (obj_request_img_data_test(obj_request) &&
+               (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
                struct rbd_img_request *img_request = obj_request->img_request;
-
-               rbd_assert(write_request ==
-                               img_request_write_test(img_request));
-               if (write_request)
-                       snapc = img_request->snapc;
+               if (op_type == OBJ_OP_WRITE) {
+                       rbd_assert(img_request_write_test(img_request));
+               } else {
+                       rbd_assert(img_request_discard_test(img_request));
+               }
+               snapc = img_request->snapc;
        }
 
-       rbd_assert(num_ops == 1 || (write_request && num_ops == 2));
+       rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
 
        /* Allocate and initialize the request, for the num_ops ops */
 
@@ -1850,7 +1919,7 @@ static struct ceph_osd_request *rbd_osd_req_create(
        if (!osd_req)
                return NULL;    /* ENOMEM */
 
-       if (write_request)
+       if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
                osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
        else
                osd_req->r_flags = CEPH_OSD_FLAG_READ;
@@ -1865,9 +1934,10 @@ static struct ceph_osd_request *rbd_osd_req_create(
 }
 
 /*
- * Create a copyup osd request based on the information in the
- * object request supplied.  A copyup request has three osd ops,
- * a copyup method call, a hint op, and a write op.
+ * Create a copyup osd request based on the information in the object
+ * request supplied.  A copyup request has two or three osd ops, a
+ * copyup method call, potentially a hint op, and a write or truncate
+ * or zero op.
  */
 static struct ceph_osd_request *
 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
@@ -1877,18 +1947,24 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
        struct rbd_device *rbd_dev;
        struct ceph_osd_client *osdc;
        struct ceph_osd_request *osd_req;
+       int num_osd_ops = 3;
 
        rbd_assert(obj_request_img_data_test(obj_request));
        img_request = obj_request->img_request;
        rbd_assert(img_request);
-       rbd_assert(img_request_write_test(img_request));
+       rbd_assert(img_request_write_test(img_request) ||
+                       img_request_discard_test(img_request));
+
+       if (img_request_discard_test(img_request))
+               num_osd_ops = 2;
 
-       /* Allocate and initialize the request, for the three ops */
+       /* Allocate and initialize the request, for all the ops */
 
        snapc = img_request->snapc;
        rbd_dev = img_request->rbd_dev;
        osdc = &rbd_dev->rbd_client->client->osdc;
-       osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC);
+       osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
+                                               false, GFP_ATOMIC);
        if (!osd_req)
                return NULL;    /* ENOMEM */
 
@@ -2057,7 +2133,8 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
 static struct rbd_img_request *rbd_img_request_create(
                                        struct rbd_device *rbd_dev,
                                        u64 offset, u64 length,
-                                       bool write_request)
+                                       enum obj_operation_type op_type,
+                                       struct ceph_snap_context *snapc)
 {
        struct rbd_img_request *img_request;
 
@@ -2065,20 +2142,17 @@ static struct rbd_img_request *rbd_img_request_create(
        if (!img_request)
                return NULL;
 
-       if (write_request) {
-               down_read(&rbd_dev->header_rwsem);
-               ceph_get_snap_context(rbd_dev->header.snapc);
-               up_read(&rbd_dev->header_rwsem);
-       }
-
        img_request->rq = NULL;
        img_request->rbd_dev = rbd_dev;
        img_request->offset = offset;
        img_request->length = length;
        img_request->flags = 0;
-       if (write_request) {
+       if (op_type == OBJ_OP_DISCARD) {
+               img_request_discard_set(img_request);
+               img_request->snapc = snapc;
+       } else if (op_type == OBJ_OP_WRITE) {
                img_request_write_set(img_request);
-               img_request->snapc = rbd_dev->header.snapc;
+               img_request->snapc = snapc;
        } else {
                img_request->snap_id = rbd_dev->spec->snap_id;
        }
@@ -2093,8 +2167,7 @@ static struct rbd_img_request *rbd_img_request_create(
        kref_init(&img_request->kref);
 
        dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
-               write_request ? "write" : "read", offset, length,
-               img_request);
+               obj_op_name(op_type), offset, length, img_request);
 
        return img_request;
 }
@@ -2118,7 +2191,8 @@ static void rbd_img_request_destroy(struct kref *kref)
                rbd_dev_parent_put(img_request->rbd_dev);
        }
 
-       if (img_request_write_test(img_request))
+       if (img_request_write_test(img_request) ||
+               img_request_discard_test(img_request))
                ceph_put_snap_context(img_request->snapc);
 
        kmem_cache_free(rbd_img_request_cache, img_request);
@@ -2134,8 +2208,8 @@ static struct rbd_img_request *rbd_parent_request_create(
        rbd_assert(obj_request->img_request);
        rbd_dev = obj_request->img_request->rbd_dev;
 
-       parent_request = rbd_img_request_create(rbd_dev->parent,
-                                               img_offset, length, false);
+       parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
+                                               length, OBJ_OP_READ, NULL);
        if (!parent_request)
                return NULL;
 
@@ -2176,11 +2250,18 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
        result = obj_request->result;
        if (result) {
                struct rbd_device *rbd_dev = img_request->rbd_dev;
+               enum obj_operation_type op_type;
+
+               if (img_request_discard_test(img_request))
+                       op_type = OBJ_OP_DISCARD;
+               else if (img_request_write_test(img_request))
+                       op_type = OBJ_OP_WRITE;
+               else
+                       op_type = OBJ_OP_READ;
 
                rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
-                       img_request_write_test(img_request) ? "write" : "read",
-                       obj_request->length, obj_request->img_offset,
-                       obj_request->offset);
+                       obj_op_name(op_type), obj_request->length,
+                       obj_request->img_offset, obj_request->offset);
                rbd_warn(rbd_dev, "  result %d xferred %x",
                        result, xferred);
                if (!img_request->result)
@@ -2244,6 +2325,67 @@ out:
                rbd_img_request_complete(img_request);
 }
 
+/*
+ * Add individual osd ops to the given ceph_osd_request and prepare
+ * them for submission. num_ops is the current number of
+ * osd operations already to the object request.
+ */
+static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
+                               struct ceph_osd_request *osd_request,
+                               enum obj_operation_type op_type,
+                               unsigned int num_ops)
+{
+       struct rbd_img_request *img_request = obj_request->img_request;
+       struct rbd_device *rbd_dev = img_request->rbd_dev;
+       u64 object_size = rbd_obj_bytes(&rbd_dev->header);
+       u64 offset = obj_request->offset;
+       u64 length = obj_request->length;
+       u64 img_end;
+       u16 opcode;
+
+       if (op_type == OBJ_OP_DISCARD) {
+               if (!offset && length == object_size &&
+                   (!img_request_layered_test(img_request) ||
+                    !obj_request_overlaps_parent(obj_request))) {
+                       opcode = CEPH_OSD_OP_DELETE;
+               } else if ((offset + length == object_size)) {
+                       opcode = CEPH_OSD_OP_TRUNCATE;
+               } else {
+                       down_read(&rbd_dev->header_rwsem);
+                       img_end = rbd_dev->header.image_size;
+                       up_read(&rbd_dev->header_rwsem);
+
+                       if (obj_request->img_offset + length == img_end)
+                               opcode = CEPH_OSD_OP_TRUNCATE;
+                       else
+                               opcode = CEPH_OSD_OP_ZERO;
+               }
+       } else if (op_type == OBJ_OP_WRITE) {
+               opcode = CEPH_OSD_OP_WRITE;
+               osd_req_op_alloc_hint_init(osd_request, num_ops,
+                                       object_size, object_size);
+               num_ops++;
+       } else {
+               opcode = CEPH_OSD_OP_READ;
+       }
+
+       osd_req_op_extent_init(osd_request, num_ops, opcode, offset, length,
+                               0, 0);
+       if (obj_request->type == OBJ_REQUEST_BIO)
+               osd_req_op_extent_osd_data_bio(osd_request, num_ops,
+                                       obj_request->bio_list, length);
+       else if (obj_request->type == OBJ_REQUEST_PAGES)
+               osd_req_op_extent_osd_data_pages(osd_request, num_ops,
+                                       obj_request->pages, length,
+                                       offset & ~PAGE_MASK, false, false);
+
+       /* Discards are also writes */
+       if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
+               rbd_osd_req_format_write(obj_request);
+       else
+               rbd_osd_req_format_read(obj_request);
+}
+
 /*
  * Split up an image request into one or more object requests, each
  * to a different object.  The "type" parameter indicates whether
@@ -2259,28 +2401,26 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
        struct rbd_device *rbd_dev = img_request->rbd_dev;
        struct rbd_obj_request *obj_request = NULL;
        struct rbd_obj_request *next_obj_request;
-       bool write_request = img_request_write_test(img_request);
        struct bio *bio_list = NULL;
        unsigned int bio_offset = 0;
        struct page **pages = NULL;
+       enum obj_operation_type op_type;
        u64 img_offset;
        u64 resid;
-       u16 opcode;
 
        dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
                (int)type, data_desc);
 
-       opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
        img_offset = img_request->offset;
        resid = img_request->length;
        rbd_assert(resid > 0);
+       op_type = rbd_img_request_op_type(img_request);
 
        if (type == OBJ_REQUEST_BIO) {
                bio_list = data_desc;
                rbd_assert(img_offset ==
                           bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
-       } else {
-               rbd_assert(type == OBJ_REQUEST_PAGES);
+       } else if (type == OBJ_REQUEST_PAGES) {
                pages = data_desc;
        }
 
@@ -2289,7 +2429,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                const char *object_name;
                u64 offset;
                u64 length;
-               unsigned int which = 0;
 
                object_name = rbd_segment_name(rbd_dev, img_offset);
                if (!object_name)
@@ -2321,7 +2460,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                                                                GFP_ATOMIC);
                        if (!obj_request->bio_list)
                                goto out_unwind;
-               } else {
+               } else if (type == OBJ_REQUEST_PAGES) {
                        unsigned int page_count;
 
                        obj_request->pages = pages;
@@ -2332,38 +2471,19 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                        pages += page_count;
                }
 
-               osd_req = rbd_osd_req_create(rbd_dev, write_request,
-                                            (write_request ? 2 : 1),
-                                            obj_request);
+               osd_req = rbd_osd_req_create(rbd_dev, op_type,
+                                       (op_type == OBJ_OP_WRITE) ? 2 : 1,
+                                       obj_request);
                if (!osd_req)
                        goto out_unwind;
+
                obj_request->osd_req = osd_req;
                obj_request->callback = rbd_img_obj_callback;
-               rbd_img_request_get(img_request);
-
-               if (write_request) {
-                       osd_req_op_alloc_hint_init(osd_req, which,
-                                            rbd_obj_bytes(&rbd_dev->header),
-                                            rbd_obj_bytes(&rbd_dev->header));
-                       which++;
-               }
-
-               osd_req_op_extent_init(osd_req, which, opcode, offset, length,
-                                      0, 0);
-               if (type == OBJ_REQUEST_BIO)
-                       osd_req_op_extent_osd_data_bio(osd_req, which,
-                                       obj_request->bio_list, length);
-               else
-                       osd_req_op_extent_osd_data_pages(osd_req, which,
-                                       obj_request->pages, length,
-                                       offset & ~PAGE_MASK, false, false);
+               obj_request->img_offset = img_offset;
 
-               if (write_request)
-                       rbd_osd_req_format_write(obj_request);
-               else
-                       rbd_osd_req_format_read(obj_request);
+               rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
 
-               obj_request->img_offset = img_offset;
+               rbd_img_request_get(img_request);
 
                img_offset += length;
                resid -= length;
@@ -2386,7 +2506,8 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
        struct page **pages;
        u32 page_count;
 
-       rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+       rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
+               obj_request->type == OBJ_REQUEST_NODATA);
        rbd_assert(obj_request_img_data_test(obj_request));
        img_request = obj_request->img_request;
        rbd_assert(img_request);
@@ -2424,11 +2545,10 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
        struct ceph_osd_client *osdc;
        struct rbd_device *rbd_dev;
        struct page **pages;
+       enum obj_operation_type op_type;
        u32 page_count;
        int img_result;
        u64 parent_length;
-       u64 offset;
-       u64 length;
 
        rbd_assert(img_request_child_test(img_request));
 
@@ -2492,26 +2612,10 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
        osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
                                                false, false);
 
-       /* Then the hint op */
+       /* Add the other op(s) */
 
-       osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header),
-                                  rbd_obj_bytes(&rbd_dev->header));
-
-       /* And the original write request op */
-
-       offset = orig_request->offset;
-       length = orig_request->length;
-       osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE,
-                                       offset, length, 0, 0);
-       if (orig_request->type == OBJ_REQUEST_BIO)
-               osd_req_op_extent_osd_data_bio(osd_req, 2,
-                                       orig_request->bio_list, length);
-       else
-               osd_req_op_extent_osd_data_pages(osd_req, 2,
-                                       orig_request->pages, length,
-                                       offset & ~PAGE_MASK, false, false);
-
-       rbd_osd_req_format_write(orig_request);
+       op_type = rbd_img_request_op_type(orig_request->img_request);
+       rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
 
        /* All set, send it off. */
 
@@ -2728,7 +2832,7 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
 
        rbd_assert(obj_request->img_request);
        rbd_dev = obj_request->img_request->rbd_dev;
-       stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
+       stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
                                                   stat_request);
        if (!stat_request->osd_req)
                goto out;
@@ -2748,11 +2852,10 @@ out:
        return ret;
 }
 
-static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
+static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
 {
        struct rbd_img_request *img_request;
        struct rbd_device *rbd_dev;
-       bool known;
 
        rbd_assert(obj_request_img_data_test(obj_request));
 
@@ -2760,22 +2863,44 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
        rbd_assert(img_request);
        rbd_dev = img_request->rbd_dev;
 
+       /* Reads */
+       if (!img_request_write_test(img_request) &&
+           !img_request_discard_test(img_request))
+               return true;
+
+       /* Non-layered writes */
+       if (!img_request_layered_test(img_request))
+               return true;
+
+       /*
+        * Layered writes outside of the parent overlap range don't
+        * share any data with the parent.
+        */
+       if (!obj_request_overlaps_parent(obj_request))
+               return true;
+
        /*
-        * Only writes to layered images need special handling.
-        * Reads and non-layered writes are simple object requests.
-        * Layered writes that start beyond the end of the overlap
-        * with the parent have no parent data, so they too are
-        * simple object requests.  Finally, if the target object is
-        * known to already exist, its parent data has already been
-        * copied, so a write to the object can also be handled as a
-        * simple object request.
+        * Entire-object layered writes - we will overwrite whatever
+        * parent data there is anyway.
         */
-       if (!img_request_write_test(img_request) ||
-               !img_request_layered_test(img_request) ||
-               !obj_request_overlaps_parent(obj_request) ||
-               ((known = obj_request_known_test(obj_request)) &&
-                       obj_request_exists_test(obj_request))) {
+       if (!obj_request->offset &&
+           obj_request->length == rbd_obj_bytes(&rbd_dev->header))
+               return true;
+
+       /*
+        * If the object is known to already exist, its parent data has
+        * already been copied.
+        */
+       if (obj_request_known_test(obj_request) &&
+           obj_request_exists_test(obj_request))
+               return true;
+
+       return false;
+}
 
+static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
+{
+       if (img_obj_request_simple(obj_request)) {
                struct rbd_device *rbd_dev;
                struct ceph_osd_client *osdc;
 
@@ -2791,7 +2916,7 @@ static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
         * start by reading the data for the full target object from
         * the parent so we can use it for a copyup to the target.
         */
-       if (known)
+       if (obj_request_known_test(obj_request))
                return rbd_img_obj_parent_read_full(obj_request);
 
        /* We don't know whether the target exists.  Go find out. */
@@ -2932,7 +3057,7 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
                return -ENOMEM;
 
        ret = -ENOMEM;
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
                                                  obj_request);
        if (!obj_request->osd_req)
                goto out;
@@ -2995,7 +3120,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper(
        if (!obj_request)
                return ERR_PTR(-ENOMEM);
 
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
                                                  obj_request);
        if (!obj_request->osd_req) {
                ret = -ENOMEM;
@@ -3133,7 +3258,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
        obj_request->pages = pages;
        obj_request->page_count = page_count;
 
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
                                                  obj_request);
        if (!obj_request->osd_req)
                goto out;
@@ -3183,11 +3308,20 @@ out:
 static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 {
        struct rbd_img_request *img_request;
+       struct ceph_snap_context *snapc = NULL;
        u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
        u64 length = blk_rq_bytes(rq);
-       bool wr = rq_data_dir(rq) == WRITE;
+       enum obj_operation_type op_type;
+       u64 mapping_size;
        int result;
 
+       if (rq->cmd_flags & REQ_DISCARD)
+               op_type = OBJ_OP_DISCARD;
+       else if (rq->cmd_flags & REQ_WRITE)
+               op_type = OBJ_OP_WRITE;
+       else
+               op_type = OBJ_OP_READ;
+
        /* Ignore/skip any zero-length requests */
 
        if (!length) {
@@ -3196,9 +3330,9 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
                goto err_rq;
        }
 
-       /* Disallow writes to a read-only device */
+       /* Only reads are allowed to a read-only device */
 
-       if (wr) {
+       if (op_type != OBJ_OP_READ) {
                if (rbd_dev->mapping.read_only) {
                        result = -EROFS;
                        goto err_rq;
@@ -3226,21 +3360,35 @@ static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
                goto err_rq;    /* Shouldn't happen */
        }
 
-       if (offset + length > rbd_dev->mapping.size) {
+       down_read(&rbd_dev->header_rwsem);
+       mapping_size = rbd_dev->mapping.size;
+       if (op_type != OBJ_OP_READ) {
+               snapc = rbd_dev->header.snapc;
+               ceph_get_snap_context(snapc);
+       }
+       up_read(&rbd_dev->header_rwsem);
+
+       if (offset + length > mapping_size) {
                rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
-                        length, rbd_dev->mapping.size);
+                        length, mapping_size);
                result = -EIO;
                goto err_rq;
        }
 
-       img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
+       img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
+                                            snapc);
        if (!img_request) {
                result = -ENOMEM;
                goto err_rq;
        }
        img_request->rq = rq;
 
-       result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
+       if (op_type == OBJ_OP_DISCARD)
+               result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
+                                             NULL);
+       else
+               result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+                                             rq->bio);
        if (result)
                goto err_img_request;
 
@@ -3255,7 +3403,9 @@ err_img_request:
 err_rq:
        if (result)
                rbd_warn(rbd_dev, "%s %llx at %llx result %d",
-                        wr ? "write" : "read", length, offset, result);
+                        obj_op_name(op_type), length, offset, result);
+       if (snapc)
+               ceph_put_snap_context(snapc);
        blk_end_request_all(rq, result);
 }
 
@@ -3393,7 +3543,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
        obj_request->pages = pages;
        obj_request->page_count = page_count;
 
-       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
                                                  obj_request);
        if (!obj_request->osd_req)
                goto out;
@@ -3610,6 +3760,13 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        blk_queue_io_min(q, segment_size);
        blk_queue_io_opt(q, segment_size);
 
+       /* enable the discard support */
+       queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
+       q->limits.discard_granularity = segment_size;
+       q->limits.discard_alignment = segment_size;
+       q->limits.max_discard_sectors = segment_size / SECTOR_SIZE;
+       q->limits.discard_zeroes_data = 1;
+
        blk_queue_merge_bvec(q, rbd_merge_bvec);
        disk->queue = q;
 
@@ -4924,7 +5081,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
                ret = image_id ? 0 : -ENOMEM;
                if (!ret)
                        rbd_dev->image_format = 1;
-       } else if (ret > sizeof (__le32)) {
+       } else if (ret >= 0) {
                void *p = response;
 
                image_id = ceph_extract_encoded_string(&p, p + ret,
@@ -4932,8 +5089,6 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
                ret = PTR_ERR_OR_ZERO(image_id);
                if (!ret)
                        rbd_dev->image_format = 2;
-       } else {
-               ret = -EINVAL;
        }
 
        if (!ret) {
@@ -5087,7 +5242,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
        set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
-       rbd_dev->rq_wq = alloc_workqueue("%s", 0, 0, rbd_dev->disk->disk_name);
+       rbd_dev->rq_wq = alloc_workqueue("%s", WQ_MEM_RECLAIM, 0,
+                                        rbd_dev->disk->disk_name);
        if (!rbd_dev->rq_wq) {
                ret = -ENOMEM;
                goto err_out_mapping;
index cebf2eb..5bd853b 100644 (file)
@@ -169,36 +169,109 @@ out:
        return ret;
 }
 
-int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
+int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
+                      struct ceph_acls_info *info)
 {
-       struct posix_acl *default_acl, *acl;
-       umode_t new_mode = inode->i_mode;
-       int error;
-
-       error = posix_acl_create(dir, &new_mode, &default_acl, &acl);
-       if (error)
-               return error;
-
-       if (!default_acl && !acl) {
-               cache_no_acl(inode);
-               if (new_mode != inode->i_mode) {
-                       struct iattr newattrs = {
-                               .ia_mode = new_mode,
-                               .ia_valid = ATTR_MODE,
-                       };
-                       error = ceph_setattr(dentry, &newattrs);
+       struct posix_acl *acl, *default_acl;
+       size_t val_size1 = 0, val_size2 = 0;
+       struct ceph_pagelist *pagelist = NULL;
+       void *tmp_buf = NULL;
+       int err;
+
+       err = posix_acl_create(dir, mode, &default_acl, &acl);
+       if (err)
+               return err;
+
+       if (acl) {
+               int ret = posix_acl_equiv_mode(acl, mode);
+               if (ret < 0)
+                       goto out_err;
+               if (ret == 0) {
+                       posix_acl_release(acl);
+                       acl = NULL;
                }
-               return error;
        }
 
-       if (default_acl) {
-               error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT);
-               posix_acl_release(default_acl);
-       }
+       if (!default_acl && !acl)
+               return 0;
+
+       if (acl)
+               val_size1 = posix_acl_xattr_size(acl->a_count);
+       if (default_acl)
+               val_size2 = posix_acl_xattr_size(default_acl->a_count);
+
+       err = -ENOMEM;
+       tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS);
+       if (!tmp_buf)
+               goto out_err;
+       pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS);
+       if (!pagelist)
+               goto out_err;
+       ceph_pagelist_init(pagelist);
+
+       err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
+       if (err)
+               goto out_err;
+
+       ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1);
+
        if (acl) {
-               if (!error)
-                       error = ceph_set_acl(inode, acl, ACL_TYPE_ACCESS);
-               posix_acl_release(acl);
+               size_t len = strlen(POSIX_ACL_XATTR_ACCESS);
+               err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8);
+               if (err)
+                       goto out_err;
+               ceph_pagelist_encode_string(pagelist, POSIX_ACL_XATTR_ACCESS,
+                                           len);
+               err = posix_acl_to_xattr(&init_user_ns, acl,
+                                        tmp_buf, val_size1);
+               if (err < 0)
+                       goto out_err;
+               ceph_pagelist_encode_32(pagelist, val_size1);
+               ceph_pagelist_append(pagelist, tmp_buf, val_size1);
        }
-       return error;
+       if (default_acl) {
+               size_t len = strlen(POSIX_ACL_XATTR_DEFAULT);
+               err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8);
+               if (err)
+                       goto out_err;
+               err = ceph_pagelist_encode_string(pagelist,
+                                                 POSIX_ACL_XATTR_DEFAULT, len);
+               err = posix_acl_to_xattr(&init_user_ns, default_acl,
+                                        tmp_buf, val_size2);
+               if (err < 0)
+                       goto out_err;
+               ceph_pagelist_encode_32(pagelist, val_size2);
+               ceph_pagelist_append(pagelist, tmp_buf, val_size2);
+       }
+
+       kfree(tmp_buf);
+
+       info->acl = acl;
+       info->default_acl = default_acl;
+       info->pagelist = pagelist;
+       return 0;
+
+out_err:
+       posix_acl_release(acl);
+       posix_acl_release(default_acl);
+       kfree(tmp_buf);
+       if (pagelist)
+               ceph_pagelist_release(pagelist);
+       return err;
+}
+
+void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info)
+{
+       if (!inode)
+               return;
+       ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl);
+       ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl);
+}
+
+void ceph_release_acls_info(struct ceph_acls_info *info)
+{
+       posix_acl_release(info->acl);
+       posix_acl_release(info->default_acl);
+       if (info->pagelist)
+               ceph_pagelist_release(info->pagelist);
 }
index 90b3954..18c06bb 100644 (file)
@@ -1076,12 +1076,6 @@ retry_locked:
        /* past end of file? */
        i_size = inode->i_size;   /* caller holds i_mutex */
 
-       if (i_size + len > inode->i_sb->s_maxbytes) {
-               /* file is too big */
-               r = -EINVAL;
-               goto fail;
-       }
-
        if (page_off >= i_size ||
            (pos_in_page == 0 && (pos+len) >= i_size &&
             end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
@@ -1099,9 +1093,6 @@ retry_locked:
        if (r < 0)
                goto fail_nosnap;
        goto retry_locked;
-
-fail:
-       up_read(&mdsc->snap_rwsem);
 fail_nosnap:
        unlock_page(page);
        return r;
index 6d1cd45..659f2ea 100644 (file)
@@ -2397,12 +2397,12 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
        u64 max_size = le64_to_cpu(grant->max_size);
        struct timespec mtime, atime, ctime;
        int check_caps = 0;
-       bool wake = 0;
-       bool writeback = 0;
-       bool queue_trunc = 0;
-       bool queue_invalidate = 0;
-       bool queue_revalidate = 0;
-       bool deleted_inode = 0;
+       bool wake = false;
+       bool writeback = false;
+       bool queue_trunc = false;
+       bool queue_invalidate = false;
+       bool queue_revalidate = false;
+       bool deleted_inode = false;
 
        dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
             inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2437,7 +2437,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
                        /* there were locked pages.. invalidate later
                           in a separate thread. */
                        if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
-                               queue_invalidate = 1;
+                               queue_invalidate = true;
                                ci->i_rdcache_revoking = ci->i_rdcache_gen;
                        }
                }
@@ -2466,7 +2466,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
                set_nlink(inode, le32_to_cpu(grant->nlink));
                if (inode->i_nlink == 0 &&
                    (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
-                       deleted_inode = 1;
+                       deleted_inode = true;
        }
 
        if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
@@ -2487,7 +2487,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
        /* Do we need to revalidate our fscache cookie. Don't bother on the
         * first cache cap as we already validate at cookie creation time. */
        if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
-               queue_revalidate = 1;
+               queue_revalidate = true;
 
        if (newcaps & CEPH_CAP_ANY_RD) {
                /* ctime/mtime/atime? */
@@ -2516,7 +2516,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
                                ci->i_wanted_max_size = 0;  /* reset */
                                ci->i_requested_max_size = 0;
                        }
-                       wake = 1;
+                       wake = true;
                }
        }
 
@@ -2546,7 +2546,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
                     ceph_cap_string(newcaps),
                     ceph_cap_string(revoking));
                if (revoking & used & CEPH_CAP_FILE_BUFFER)
-                       writeback = 1;  /* initiate writeback; will delay ack */
+                       writeback = true;  /* initiate writeback; will delay ack */
                else if (revoking == CEPH_CAP_FILE_CACHE &&
                         (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
                         queue_invalidate)
@@ -2572,7 +2572,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
                cap->implemented |= newcaps; /* add bits only, to
                                              * avoid stepping on a
                                              * pending revocation */
-               wake = 1;
+               wake = true;
        }
        BUG_ON(cap->issued & ~cap->implemented);
 
@@ -2586,7 +2586,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
                kick_flushing_inode_caps(mdsc, session, inode);
                up_read(&mdsc->snap_rwsem);
                if (newcaps & ~issued)
-                       wake = 1;
+                       wake = true;
        }
 
        if (queue_trunc) {
@@ -3045,6 +3045,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                }
        }
 
+       /* lookup ino */
+       inode = ceph_find_inode(sb, vino);
+       ci = ceph_inode(inode);
+       dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
+            vino.snap, inode);
+
        mutex_lock(&session->s_mutex);
        session->s_seq++;
        dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -3053,11 +3059,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        if (op == CEPH_CAP_OP_IMPORT)
                ceph_add_cap_releases(mdsc, session);
 
-       /* lookup ino */
-       inode = ceph_find_inode(sb, vino);
-       ci = ceph_inode(inode);
-       dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
-            vino.snap, inode);
        if (!inode) {
                dout(" i don't have ino %llx\n", vino.ino);
 
index 5a743ac..5d5a4c8 100644 (file)
@@ -158,10 +158,47 @@ static int dentry_lru_show(struct seq_file *s, void *ptr)
        return 0;
 }
 
+static int mds_sessions_show(struct seq_file *s, void *ptr)
+{
+       struct ceph_fs_client *fsc = s->private;
+       struct ceph_mds_client *mdsc = fsc->mdsc;
+       struct ceph_auth_client *ac = fsc->client->monc.auth;
+       struct ceph_options *opt = fsc->client->options;
+       int mds = -1;
+
+       mutex_lock(&mdsc->mutex);
+
+       /* The 'num' portion of an 'entity name' */
+       seq_printf(s, "global_id %llu\n", ac->global_id);
+
+       /* The -o name mount argument */
+       seq_printf(s, "name \"%s\"\n", opt->name ? opt->name : "");
+
+       /* The list of MDS session rank+state */
+       for (mds = 0; mds < mdsc->max_sessions; mds++) {
+               struct ceph_mds_session *session =
+                       __ceph_lookup_mds_session(mdsc, mds);
+               if (!session) {
+                       continue;
+               }
+               mutex_unlock(&mdsc->mutex);
+               seq_printf(s, "mds.%d %s\n",
+                               session->s_mds,
+                               ceph_session_state_name(session->s_state));
+
+               ceph_put_mds_session(session);
+               mutex_lock(&mdsc->mutex);
+       }
+       mutex_unlock(&mdsc->mutex);
+
+       return 0;
+}
+
 CEPH_DEFINE_SHOW_FUNC(mdsmap_show)
 CEPH_DEFINE_SHOW_FUNC(mdsc_show)
 CEPH_DEFINE_SHOW_FUNC(caps_show)
 CEPH_DEFINE_SHOW_FUNC(dentry_lru_show)
+CEPH_DEFINE_SHOW_FUNC(mds_sessions_show)
 
 
 /*
@@ -193,6 +230,7 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
        debugfs_remove(fsc->debugfs_bdi);
        debugfs_remove(fsc->debugfs_congestion_kb);
        debugfs_remove(fsc->debugfs_mdsmap);
+       debugfs_remove(fsc->debugfs_mds_sessions);
        debugfs_remove(fsc->debugfs_caps);
        debugfs_remove(fsc->debugfs_mdsc);
        debugfs_remove(fsc->debugfs_dentry_lru);
@@ -231,6 +269,14 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
        if (!fsc->debugfs_mdsmap)
                goto out;
 
+       fsc->debugfs_mds_sessions = debugfs_create_file("mds_sessions",
+                                       0600,
+                                       fsc->client->debugfs_dir,
+                                       fsc,
+                                       &mds_sessions_show_fops);
+       if (!fsc->debugfs_mds_sessions)
+               goto out;
+
        fsc->debugfs_mdsc = debugfs_create_file("mdsc",
                                                0600,
                                                fsc->client->debugfs_dir,
index b6c59ea..e6d63f8 100644 (file)
@@ -682,17 +682,22 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
+       struct ceph_acls_info acls = {};
        int err;
 
        if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
+       err = ceph_pre_init_acls(dir, &mode, &acls);
+       if (err < 0)
+               return err;
+
        dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
             dir, dentry, mode, rdev);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
        if (IS_ERR(req)) {
-               d_drop(dentry);
-               return PTR_ERR(req);
+               err = PTR_ERR(req);
+               goto out;
        }
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
@@ -701,15 +706,20 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        req->r_args.mknod.rdev = cpu_to_le32(rdev);
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+       if (acls.pagelist) {
+               req->r_pagelist = acls.pagelist;
+               acls.pagelist = NULL;
+       }
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
        ceph_mdsc_put_request(req);
-
+out:
        if (!err)
-               ceph_init_acl(dentry, dentry->d_inode, dir);
+               ceph_init_inode_acls(dentry->d_inode, &acls);
        else
                d_drop(dentry);
+       ceph_release_acls_info(&acls);
        return err;
 }
 
@@ -733,8 +743,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
        dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
        if (IS_ERR(req)) {
-               d_drop(dentry);
-               return PTR_ERR(req);
+               err = PTR_ERR(req);
+               goto out;
        }
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
@@ -746,9 +756,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
        ceph_mdsc_put_request(req);
-       if (!err)
-               ceph_init_acl(dentry, dentry->d_inode, dir);
-       else
+out:
+       if (err)
                d_drop(dentry);
        return err;
 }
@@ -758,6 +767,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
+       struct ceph_acls_info acls = {};
        int err = -EROFS;
        int op;
 
@@ -772,6 +782,12 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
        } else {
                goto out;
        }
+
+       mode |= S_IFDIR;
+       err = ceph_pre_init_acls(dir, &mode, &acls);
+       if (err < 0)
+               goto out;
+
        req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
        if (IS_ERR(req)) {
                err = PTR_ERR(req);
@@ -784,15 +800,20 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
        req->r_args.mkdir.mode = cpu_to_le32(mode);
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+       if (acls.pagelist) {
+               req->r_pagelist = acls.pagelist;
+               acls.pagelist = NULL;
+       }
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
        ceph_mdsc_put_request(req);
 out:
        if (!err)
-               ceph_init_acl(dentry, dentry->d_inode, dir);
+               ceph_init_inode_acls(dentry->d_inode, &acls);
        else
                d_drop(dentry);
+       ceph_release_acls_info(&acls);
        return err;
 }
 
index 2eb02f8..d7e0da8 100644 (file)
@@ -235,6 +235,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        struct dentry *dn;
+       struct ceph_acls_info acls = {};
        int err;
 
        dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n",
@@ -248,22 +249,34 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        if (err < 0)
                return err;
 
+       if (flags & O_CREAT) {
+               err = ceph_pre_init_acls(dir, &mode, &acls);
+               if (err < 0)
+                       return err;
+       }
+
        /* do the open */
        req = prepare_open_request(dir->i_sb, flags, mode);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
+       if (IS_ERR(req)) {
+               err = PTR_ERR(req);
+               goto out_acl;
+       }
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
        if (flags & O_CREAT) {
                req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
                req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+               if (acls.pagelist) {
+                       req->r_pagelist = acls.pagelist;
+                       acls.pagelist = NULL;
+               }
        }
        req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
        err = ceph_mdsc_do_request(mdsc,
                                   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
                                   req);
        if (err)
-               goto out_err;
+               goto out_req;
 
        err = ceph_handle_snapdir(req, dentry, err);
        if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
@@ -278,7 +291,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
                dn = NULL;
        }
        if (err)
-               goto out_err;
+               goto out_req;
        if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) {
                /* make vfs retry on splice, ENOENT, or symlink */
                dout("atomic_open finish_no_open on dn %p\n", dn);
@@ -286,15 +299,17 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        } else {
                dout("atomic_open finish_open on dn %p\n", dn);
                if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
-                       ceph_init_acl(dentry, dentry->d_inode, dir);
+                       ceph_init_inode_acls(dentry->d_inode, &acls);
                        *opened |= FILE_CREATED;
                }
                err = finish_open(file, dentry, ceph_open, opened);
        }
-out_err:
+out_req:
        if (!req->r_err && req->r_target_inode)
                ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
        ceph_mdsc_put_request(req);
+out_acl:
+       ceph_release_acls_info(&acls);
        dout("atomic_open result=%d\n", err);
        return err;
 }
@@ -826,8 +841,7 @@ again:
        ceph_put_cap_refs(ci, got);
 
        if (checkeof && ret >= 0) {
-               int statret = ceph_do_getattr(inode,
-                                             CEPH_STAT_CAP_SIZE);
+               int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
 
                /* hit EOF or hole? */
                if (statret == 0 && iocb->ki_pos < inode->i_size &&
@@ -836,7 +850,6 @@ again:
                             ", reading more\n", iocb->ki_pos,
                             inode->i_size);
 
-                       iov_iter_advance(to, ret);
                        read += ret;
                        len -= ret;
                        checkeof = 0;
@@ -995,7 +1008,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
        mutex_lock(&inode->i_mutex);
 
        if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
-               ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
+               ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
                if (ret < 0) {
                        offset = ret;
                        goto out;
index 04c89c2..7b61390 100644 (file)
@@ -766,7 +766,7 @@ static int fill_inode(struct inode *inode,
 
        /* xattrs */
        /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
-       if ((issued & CEPH_CAP_XATTR_EXCL) == 0 &&
+       if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))  &&
            le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
                if (ci->i_xattrs.blob)
                        ceph_buffer_put(ci->i_xattrs.blob);
@@ -1813,10 +1813,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        if (ia_valid & ATTR_SIZE) {
                dout("setattr %p size %lld -> %lld\n", inode,
                     inode->i_size, attr->ia_size);
-               if (attr->ia_size > inode->i_sb->s_maxbytes) {
-                       err = -EINVAL;
-                       goto out;
-               }
                if ((issued & CEPH_CAP_FILE_EXCL) &&
                    attr->ia_size > inode->i_size) {
                        inode->i_size = attr->ia_size;
@@ -1896,8 +1892,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        if (mask & CEPH_SETATTR_SIZE)
                __ceph_do_pending_vmtruncate(inode);
        return err;
-out:
-       spin_unlock(&ci->i_ceph_lock);
 out_put:
        ceph_mdsc_put_request(req);
        return err;
@@ -1907,7 +1901,7 @@ out_put:
  * Verify that we have a lease on the given mask.  If not,
  * do a getattr against an mds.
  */
-int ceph_do_getattr(struct inode *inode, int mask)
+int ceph_do_getattr(struct inode *inode, int mask, bool force)
 {
        struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
        struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1920,7 +1914,7 @@ int ceph_do_getattr(struct inode *inode, int mask)
        }
 
        dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
-       if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
+       if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
                return 0;
 
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
@@ -1948,7 +1942,7 @@ int ceph_permission(struct inode *inode, int mask)
        if (mask & MAY_NOT_BLOCK)
                return -ECHILD;
 
-       err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED);
+       err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED, false);
 
        if (!err)
                err = generic_permission(inode, mask);
@@ -1966,7 +1960,7 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
        struct ceph_inode_info *ci = ceph_inode(inode);
        int err;
 
-       err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL);
+       err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false);
        if (!err) {
                generic_fillattr(inode, stat);
                stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
index a822a6e..f851d8d 100644 (file)
@@ -19,7 +19,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
        struct ceph_ioctl_layout l;
        int err;
 
-       err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT);
+       err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false);
        if (!err) {
                l.stripe_unit = ceph_file_layout_su(ci->i_layout);
                l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
@@ -41,7 +41,7 @@ static long __validate_layout(struct ceph_mds_client *mdsc,
        /* validate striping parameters */
        if ((l->object_size & ~PAGE_MASK) ||
            (l->stripe_unit & ~PAGE_MASK) ||
-           (l->stripe_unit != 0 &&
+           ((unsigned)l->stripe_unit != 0 &&
             ((unsigned)l->object_size % (unsigned)l->stripe_unit)))
                return -EINVAL;
 
@@ -74,7 +74,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
                return -EFAULT;
 
        /* validate changed params against current layout */
-       err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT);
+       err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false);
        if (err)
                return err;
 
index bad07c0..a92d3f5 100644 (file)
@@ -7,6 +7,7 @@
 #include <linux/sched.h>
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
+#include <linux/utsname.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -334,7 +335,7 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 /*
  * sessions
  */
-static const char *session_state_name(int s)
+const char *ceph_session_state_name(int s)
 {
        switch (s) {
        case CEPH_MDS_SESSION_NEW: return "new";
@@ -542,6 +543,8 @@ void ceph_mdsc_release_request(struct kref *kref)
        }
        kfree(req->r_path1);
        kfree(req->r_path2);
+       if (req->r_pagelist)
+               ceph_pagelist_release(req->r_pagelist);
        put_request_session(req);
        ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
        kfree(req);
@@ -812,6 +815,74 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
        h = msg->front.iov_base;
        h->op = cpu_to_le32(op);
        h->seq = cpu_to_le64(seq);
+
+       return msg;
+}
+
+/*
+ * session message, specialization for CEPH_SESSION_REQUEST_OPEN
+ * to include additional client metadata fields.
+ */
+static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
+{
+       struct ceph_msg *msg;
+       struct ceph_mds_session_head *h;
+       int i = -1;
+       int metadata_bytes = 0;
+       int metadata_key_count = 0;
+       struct ceph_options *opt = mdsc->fsc->client->options;
+       void *p;
+
+       const char* metadata[3][2] = {
+               {"hostname", utsname()->nodename},
+               {"entity_id", opt->name ? opt->name : ""},
+               {NULL, NULL}
+       };
+
+       /* Calculate serialized length of metadata */
+       metadata_bytes = 4;  /* map length */
+       for (i = 0; metadata[i][0] != NULL; ++i) {
+               metadata_bytes += 8 + strlen(metadata[i][0]) +
+                       strlen(metadata[i][1]);
+               metadata_key_count++;
+       }
+
+       /* Allocate the message */
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
+                          GFP_NOFS, false);
+       if (!msg) {
+               pr_err("create_session_msg ENOMEM creating msg\n");
+               return NULL;
+       }
+       h = msg->front.iov_base;
+       h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
+       h->seq = cpu_to_le64(seq);
+
+       /*
+        * Serialize client metadata into waiting buffer space, using
+        * the format that userspace expects for map<string, string>
+        */
+       msg->hdr.version = 2;  /* ClientSession messages with metadata are v2 */
+
+       /* The write pointer, following the session_head structure */
+       p = msg->front.iov_base + sizeof(*h);
+
+       /* Number of entries in the map */
+       ceph_encode_32(&p, metadata_key_count);
+
+       /* Two length-prefixed strings for each entry in the map */
+       for (i = 0; metadata[i][0] != NULL; ++i) {
+               size_t const key_len = strlen(metadata[i][0]);
+               size_t const val_len = strlen(metadata[i][1]);
+
+               ceph_encode_32(&p, key_len);
+               memcpy(p, metadata[i][0], key_len);
+               p += key_len;
+               ceph_encode_32(&p, val_len);
+               memcpy(p, metadata[i][1], val_len);
+               p += val_len;
+       }
+
        return msg;
 }
 
@@ -835,7 +906,7 @@ static int __open_session(struct ceph_mds_client *mdsc,
        session->s_renew_requested = jiffies;
 
        /* send connect message */
-       msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
+       msg = create_session_open_msg(mdsc, session->s_seq);
        if (!msg)
                return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
@@ -1164,7 +1235,7 @@ static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
        struct ceph_msg *msg;
 
        dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
-            session->s_mds, session_state_name(session->s_state), seq);
+            session->s_mds, ceph_session_state_name(session->s_state), seq);
        msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
        if (!msg)
                return -ENOMEM;
@@ -1216,7 +1287,7 @@ static int request_close_session(struct ceph_mds_client *mdsc,
        struct ceph_msg *msg;
 
        dout("request_close_session mds%d state %s seq %lld\n",
-            session->s_mds, session_state_name(session->s_state),
+            session->s_mds, ceph_session_state_name(session->s_state),
             session->s_seq);
        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
        if (!msg)
@@ -1847,13 +1918,15 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        msg->front.iov_len = p - msg->front.iov_base;
        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
-       if (req->r_data_len) {
-               /* outbound data set only by ceph_sync_setxattr() */
-               BUG_ON(!req->r_pages);
-               ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0);
+       if (req->r_pagelist) {
+               struct ceph_pagelist *pagelist = req->r_pagelist;
+               atomic_inc(&pagelist->refcnt);
+               ceph_msg_data_add_pagelist(msg, pagelist);
+               msg->hdr.data_len = cpu_to_le32(pagelist->length);
+       } else {
+               msg->hdr.data_len = 0;
        }
 
-       msg->hdr.data_len = cpu_to_le32(req->r_data_len);
        msg->hdr.data_off = cpu_to_le16(0);
 
 out_free2:
@@ -2007,7 +2080,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
        req->r_session = get_session(session);
 
        dout("do_request mds%d session %p state %s\n", mds, session,
-            session_state_name(session->s_state));
+            ceph_session_state_name(session->s_state));
        if (session->s_state != CEPH_MDS_SESSION_OPEN &&
            session->s_state != CEPH_MDS_SESSION_HUNG) {
                if (session->s_state == CEPH_MDS_SESSION_NEW ||
@@ -2078,6 +2151,7 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
                if (req->r_session &&
                    req->r_session->s_mds == mds) {
                        dout(" kicking tid %llu\n", req->r_tid);
+                       list_del_init(&req->r_wait);
                        __do_request(mdsc, req);
                }
        }
@@ -2444,7 +2518,7 @@ static void handle_session(struct ceph_mds_session *session,
 
        dout("handle_session mds%d %s %p state %s seq %llu\n",
             mds, ceph_session_op_name(op), session,
-            session_state_name(session->s_state), seq);
+            ceph_session_state_name(session->s_state), seq);
 
        if (session->s_state == CEPH_MDS_SESSION_HUNG) {
                session->s_state = CEPH_MDS_SESSION_OPEN;
@@ -2471,9 +2545,8 @@ static void handle_session(struct ceph_mds_session *session,
                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
                        pr_info("mds%d reconnect denied\n", session->s_mds);
                remove_session_caps(session);
-               wake = 1; /* for good measure */
+               wake = 2; /* for good measure */
                wake_up_all(&mdsc->session_close_wq);
-               kick_requests(mdsc, mds);
                break;
 
        case CEPH_SESSION_STALE:
@@ -2503,6 +2576,8 @@ static void handle_session(struct ceph_mds_session *session,
        if (wake) {
                mutex_lock(&mdsc->mutex);
                __wake_requests(mdsc, &session->s_waiting);
+               if (wake == 2)
+                       kick_requests(mdsc, mds);
                mutex_unlock(&mdsc->mutex);
        }
        return;
@@ -2695,18 +2770,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        session->s_state = CEPH_MDS_SESSION_RECONNECTING;
        session->s_seq = 0;
 
-       ceph_con_close(&session->s_con);
-       ceph_con_open(&session->s_con,
-                     CEPH_ENTITY_TYPE_MDS, mds,
-                     ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
-
-       /* replay unsafe requests */
-       replay_unsafe_requests(mdsc, session);
-
-       down_read(&mdsc->snap_rwsem);
-
        dout("session %p state %s\n", session,
-            session_state_name(session->s_state));
+            ceph_session_state_name(session->s_state));
 
        spin_lock(&session->s_gen_ttl_lock);
        session->s_cap_gen++;
@@ -2723,6 +2788,19 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        discard_cap_releases(mdsc, session);
        spin_unlock(&session->s_cap_lock);
 
+       /* trim unused caps to reduce MDS's cache rejoin time */
+       shrink_dcache_parent(mdsc->fsc->sb->s_root);
+
+       ceph_con_close(&session->s_con);
+       ceph_con_open(&session->s_con,
+                     CEPH_ENTITY_TYPE_MDS, mds,
+                     ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
+
+       /* replay unsafe requests */
+       replay_unsafe_requests(mdsc, session);
+
+       down_read(&mdsc->snap_rwsem);
+
        /* traverse this session's caps */
        s_nr_caps = session->s_nr_caps;
        err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
@@ -2791,7 +2869,6 @@ fail:
        mutex_unlock(&session->s_mutex);
 fail_nomsg:
        ceph_pagelist_release(pagelist);
-       kfree(pagelist);
 fail_nopagelist:
        pr_err("error %d preparing reconnect for mds%d\n", err, mds);
        return;
@@ -2827,7 +2904,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
                     ceph_mds_state_name(newstate),
                     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
-                    session_state_name(s->s_state));
+                    ceph_session_state_name(s->s_state));
 
                if (i >= newmap->m_max_mds ||
                    memcmp(ceph_mdsmap_get_addr(oldmap, i),
@@ -2939,14 +3016,15 @@ static void handle_lease(struct ceph_mds_client *mdsc,
        if (dname.len != get_unaligned_le32(h+1))
                goto bad;
 
-       mutex_lock(&session->s_mutex);
-       session->s_seq++;
-
        /* lookup inode */
        inode = ceph_find_inode(sb, vino);
        dout("handle_lease %s, ino %llx %p %.*s\n",
             ceph_lease_op_name(h->action), vino.ino, inode,
             dname.len, dname.name);
+
+       mutex_lock(&session->s_mutex);
+       session->s_seq++;
+
        if (inode == NULL) {
                dout("handle_lease no inode %llx\n", vino.ino);
                goto release;
index e00737c..3288359 100644 (file)
@@ -202,9 +202,7 @@ struct ceph_mds_request {
        bool r_direct_is_hash;  /* true if r_direct_hash is valid */
 
        /* data payload is used for xattr ops */
-       struct page **r_pages;
-       int r_num_pages;
-       int r_data_len;
+       struct ceph_pagelist *r_pagelist;
 
        /* what caps shall we drop? */
        int r_inode_drop, r_inode_unless;
@@ -332,6 +330,8 @@ ceph_get_mds_session(struct ceph_mds_session *s)
        return s;
 }
 
+extern const char *ceph_session_state_name(int s);
+
 extern void ceph_put_mds_session(struct ceph_mds_session *s);
 
 extern int ceph_send_msg_mds(struct ceph_mds_client *mdsc,
index 12b2074..b82f507 100644 (file)
@@ -95,6 +95,7 @@ struct ceph_fs_client {
        struct dentry *debugfs_congestion_kb;
        struct dentry *debugfs_bdi;
        struct dentry *debugfs_mdsc, *debugfs_mdsmap;
+       struct dentry *debugfs_mds_sessions;
 #endif
 
 #ifdef CONFIG_CEPH_FSCACHE
@@ -714,7 +715,7 @@ extern void ceph_queue_vmtruncate(struct inode *inode);
 extern void ceph_queue_invalidate(struct inode *inode);
 extern void ceph_queue_writeback(struct inode *inode);
 
-extern int ceph_do_getattr(struct inode *inode, int mask);
+extern int ceph_do_getattr(struct inode *inode, int mask, bool force);
 extern int ceph_permission(struct inode *inode, int mask);
 extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
 extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
@@ -733,15 +734,23 @@ extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
 extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
 extern void __init ceph_xattr_init(void);
 extern void ceph_xattr_exit(void);
+extern const struct xattr_handler *ceph_xattr_handlers[];
 
 /* acl.c */
-extern const struct xattr_handler *ceph_xattr_handlers[];
+struct ceph_acls_info {
+       void *default_acl;
+       void *acl;
+       struct ceph_pagelist *pagelist;
+};
 
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
 
 struct posix_acl *ceph_get_acl(struct inode *, int);
 int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type);
-int ceph_init_acl(struct dentry *, struct inode *, struct inode *);
+int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
+                      struct ceph_acls_info *info);
+void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info);
+void ceph_release_acls_info(struct ceph_acls_info *info);
 
 static inline void ceph_forget_all_cached_acls(struct inode *inode)
 {
@@ -753,12 +762,18 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
 #define ceph_get_acl NULL
 #define ceph_set_acl NULL
 
-static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode,
-                               struct inode *dir)
+static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
+                                    struct ceph_acls_info *info)
 {
        return 0;
 }
-
+static inline void ceph_init_inode_acls(struct inode *inode,
+                                       struct ceph_acls_info *info)
+{
+}
+static inline void ceph_release_acls_info(struct ceph_acls_info *info)
+{
+}
 static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
 {
        return 0;
index 12f58d2..678b0d2 100644 (file)
@@ -1,4 +1,5 @@
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/pagelist.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -284,8 +285,7 @@ static size_t ceph_vxattrs_name_size(struct ceph_vxattr *vxattrs)
                return ceph_dir_vxattrs_name_size;
        if (vxattrs == ceph_file_vxattrs)
                return ceph_file_vxattrs_name_size;
-       BUG();
-
+       BUG_ON(vxattrs);
        return 0;
 }
 
@@ -736,24 +736,20 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
        dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
             ci->i_xattrs.version, ci->i_xattrs.index_version);
 
-       if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
-           (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
-               goto get_xattr;
-       } else {
+       if (ci->i_xattrs.version == 0 ||
+           !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
                spin_unlock(&ci->i_ceph_lock);
                /* get xattrs from mds (if we don't already have them) */
-               err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
+               err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
                if (err)
                        return err;
+               spin_lock(&ci->i_ceph_lock);
        }
 
-       spin_lock(&ci->i_ceph_lock);
-
        err = __build_xattrs(inode);
        if (err < 0)
                goto out;
 
-get_xattr:
        err = -ENODATA;  /* == ENOATTR */
        xattr = __get_xattr(ci, name);
        if (!xattr)
@@ -798,23 +794,18 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
        dout("listxattr %p ver=%lld index_ver=%lld\n", inode,
             ci->i_xattrs.version, ci->i_xattrs.index_version);
 
-       if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
-           (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
-               goto list_xattr;
-       } else {
+       if (ci->i_xattrs.version == 0 ||
+           !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
                spin_unlock(&ci->i_ceph_lock);
-               err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
+               err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
                if (err)
                        return err;
+               spin_lock(&ci->i_ceph_lock);
        }
 
-       spin_lock(&ci->i_ceph_lock);
-
        err = __build_xattrs(inode);
        if (err < 0)
                goto out;
-
-list_xattr:
        /*
         * Start with virtual dir xattr names (if any) (including
         * terminating '\0' characters for each).
@@ -860,35 +851,25 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_mds_request *req;
        struct ceph_mds_client *mdsc = fsc->mdsc;
+       struct ceph_pagelist *pagelist = NULL;
        int err;
-       int i, nr_pages;
-       struct page **pages = NULL;
-       void *kaddr;
-
-       /* copy value into some pages */
-       nr_pages = calc_pages_for(0, size);
-       if (nr_pages) {
-               pages = kmalloc(sizeof(pages[0])*nr_pages, GFP_NOFS);
-               if (!pages)
+
+       if (value) {
+               /* copy value into pagelist */
+               pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+               if (!pagelist)
                        return -ENOMEM;
-               err = -ENOMEM;
-               for (i = 0; i < nr_pages; i++) {
-                       pages[i] = __page_cache_alloc(GFP_NOFS);
-                       if (!pages[i]) {
-                               nr_pages = i;
-                               goto out;
-                       }
-                       kaddr = kmap(pages[i]);
-                       memcpy(kaddr, value + i*PAGE_CACHE_SIZE,
-                              min(PAGE_CACHE_SIZE, size-i*PAGE_CACHE_SIZE));
-               }
+
+               ceph_pagelist_init(pagelist);
+               err = ceph_pagelist_append(pagelist, value, size);
+               if (err)
+                       goto out;
+       } else {
+               flags |= CEPH_XATTR_REMOVE;
        }
 
        dout("setxattr value=%.*s\n", (int)size, value);
 
-       if (!value)
-               flags |= CEPH_XATTR_REMOVE;
-
        /* do request */
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR,
                                       USE_AUTH_MDS);
@@ -903,9 +884,8 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
        req->r_args.setxattr.flags = cpu_to_le32(flags);
        req->r_path2 = kstrdup(name, GFP_NOFS);
 
-       req->r_pages = pages;
-       req->r_num_pages = nr_pages;
-       req->r_data_len = size;
+       req->r_pagelist = pagelist;
+       pagelist = NULL;
 
        dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
        err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -913,11 +893,8 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
        dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);
 
 out:
-       if (pages) {
-               for (i = 0; i < nr_pages; i++)
-                       __free_page(pages[i]);
-               kfree(pages);
-       }
+       if (pagelist)
+               ceph_pagelist_release(pagelist);
        return err;
 }
 
@@ -968,7 +945,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
 retry:
        issued = __ceph_caps_issued(ci, NULL);
        dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
-       if (!(issued & CEPH_CAP_XATTR_EXCL))
+       if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
                goto do_sync;
        __build_xattrs(inode);
 
@@ -1077,7 +1054,7 @@ retry:
        issued = __ceph_caps_issued(ci, NULL);
        dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
 
-       if (!(issued & CEPH_CAP_XATTR_EXCL))
+       if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
                goto do_sync;
        __build_xattrs(inode);
 
index 279b0af..07bc359 100644 (file)
@@ -211,7 +211,6 @@ extern struct page **ceph_get_direct_page_vector(const void __user *data,
                                                 bool write_page);
 extern void ceph_put_page_vector(struct page **pages, int num_pages,
                                 bool dirty);
-extern void ceph_release_page_vector(struct page **pages, int num_pages);
 extern struct page **ceph_alloc_page_vector(int num_pages, gfp_t flags);
 extern int ceph_copy_user_to_page_vector(struct page **pages,
                                         const void __user *data,
index 9660d6b..5f871d8 100644 (file)
@@ -2,6 +2,7 @@
 #define __FS_CEPH_PAGELIST_H
 
 #include <linux/list.h>
+#include <linux/atomic.h>
 
 struct ceph_pagelist {
        struct list_head head;
@@ -10,6 +11,7 @@ struct ceph_pagelist {
        size_t room;
        struct list_head free_list;
        size_t num_pages_free;
+       atomic_t refcnt;
 };
 
 struct ceph_pagelist_cursor {
@@ -26,9 +28,10 @@ static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
        pl->room = 0;
        INIT_LIST_HEAD(&pl->free_list);
        pl->num_pages_free = 0;
+       atomic_set(&pl->refcnt, 1);
 }
 
-extern int ceph_pagelist_release(struct ceph_pagelist *pl);
+extern void ceph_pagelist_release(struct ceph_pagelist *pl);
 
 extern int ceph_pagelist_append(struct ceph_pagelist *pl, const void *d, size_t l);
 
index f20e0d8..2f822dc 100644 (file)
@@ -172,6 +172,7 @@ extern const char *ceph_osd_state_name(int s);
 #define CEPH_OSD_OP_MODE_WR    0x2000
 #define CEPH_OSD_OP_MODE_RMW   0x3000
 #define CEPH_OSD_OP_MODE_SUB   0x4000
+#define CEPH_OSD_OP_MODE_CACHE 0x8000
 
 #define CEPH_OSD_OP_TYPE       0x0f00
 #define CEPH_OSD_OP_TYPE_LOCK  0x0100
@@ -181,103 +182,135 @@ extern const char *ceph_osd_state_name(int s);
 #define CEPH_OSD_OP_TYPE_PG    0x0500
 #define CEPH_OSD_OP_TYPE_MULTI 0x0600 /* multiobject */
 
+#define __CEPH_OSD_OP1(mode, nr) \
+       (CEPH_OSD_OP_MODE_##mode | (nr))
+
+#define __CEPH_OSD_OP(mode, type, nr) \
+       (CEPH_OSD_OP_MODE_##mode | CEPH_OSD_OP_TYPE_##type | (nr))
+
+#define __CEPH_FORALL_OSD_OPS(f)                                           \
+       /** data **/                                                        \
+       /* read */                                                          \
+       f(READ,         __CEPH_OSD_OP(RD, DATA, 1),     "read")             \
+       f(STAT,         __CEPH_OSD_OP(RD, DATA, 2),     "stat")             \
+       f(MAPEXT,       __CEPH_OSD_OP(RD, DATA, 3),     "mapext")           \
+                                                                           \
+       /* fancy read */                                                    \
+       f(MASKTRUNC,    __CEPH_OSD_OP(RD, DATA, 4),     "masktrunc")        \
+       f(SPARSE_READ,  __CEPH_OSD_OP(RD, DATA, 5),     "sparse-read")      \
+                                                                           \
+       f(NOTIFY,       __CEPH_OSD_OP(RD, DATA, 6),     "notify")           \
+       f(NOTIFY_ACK,   __CEPH_OSD_OP(RD, DATA, 7),     "notify-ack")       \
+                                                                           \
+       /* versioning */                                                    \
+       f(ASSERT_VER,   __CEPH_OSD_OP(RD, DATA, 8),     "assert-version")   \
+                                                                           \
+       f(LIST_WATCHERS, __CEPH_OSD_OP(RD, DATA, 9),    "list-watchers")    \
+                                                                           \
+       f(LIST_SNAPS,   __CEPH_OSD_OP(RD, DATA, 10),    "list-snaps")       \
+                                                                           \
+       /* sync */                                                          \
+       f(SYNC_READ,    __CEPH_OSD_OP(RD, DATA, 11),    "sync_read")        \
+                                                                           \
+       /* write */                                                         \
+       f(WRITE,        __CEPH_OSD_OP(WR, DATA, 1),     "write")            \
+       f(WRITEFULL,    __CEPH_OSD_OP(WR, DATA, 2),     "writefull")        \
+       f(TRUNCATE,     __CEPH_OSD_OP(WR, DATA, 3),     "truncate")         \
+       f(ZERO,         __CEPH_OSD_OP(WR, DATA, 4),     "zero")             \
+       f(DELETE,       __CEPH_OSD_OP(WR, DATA, 5),     "delete")           \
+                                                                           \
+       /* fancy write */                                                   \
+       f(APPEND,       __CEPH_OSD_OP(WR, DATA, 6),     "append")           \
+       f(STARTSYNC,    __CEPH_OSD_OP(WR, DATA, 7),     "startsync")        \
+       f(SETTRUNC,     __CEPH_OSD_OP(WR, DATA, 8),     "settrunc")         \
+       f(TRIMTRUNC,    __CEPH_OSD_OP(WR, DATA, 9),     "trimtrunc")        \
+                                                                           \
+       f(TMAPUP,       __CEPH_OSD_OP(RMW, DATA, 10),   "tmapup")           \
+       f(TMAPPUT,      __CEPH_OSD_OP(WR, DATA, 11),    "tmapput")          \
+       f(TMAPGET,      __CEPH_OSD_OP(RD, DATA, 12),    "tmapget")          \
+                                                                           \
+       f(CREATE,       __CEPH_OSD_OP(WR, DATA, 13),    "create")           \
+       f(ROLLBACK,     __CEPH_OSD_OP(WR, DATA, 14),    "rollback")         \
+                                                                           \
+       f(WATCH,        __CEPH_OSD_OP(WR, DATA, 15),    "watch")            \
+                                                                           \
+       /* omap */                                                          \
+       f(OMAPGETKEYS,  __CEPH_OSD_OP(RD, DATA, 17),    "omap-get-keys")    \
+       f(OMAPGETVALS,  __CEPH_OSD_OP(RD, DATA, 18),    "omap-get-vals")    \
+       f(OMAPGETHEADER, __CEPH_OSD_OP(RD, DATA, 19),   "omap-get-header")  \
+       f(OMAPGETVALSBYKEYS, __CEPH_OSD_OP(RD, DATA, 20), "omap-get-vals-by-keys") \
+       f(OMAPSETVALS,  __CEPH_OSD_OP(WR, DATA, 21),    "omap-set-vals")    \
+       f(OMAPSETHEADER, __CEPH_OSD_OP(WR, DATA, 22),   "omap-set-header")  \
+       f(OMAPCLEAR,    __CEPH_OSD_OP(WR, DATA, 23),    "omap-clear")       \
+       f(OMAPRMKEYS,   __CEPH_OSD_OP(WR, DATA, 24),    "omap-rm-keys")     \
+       f(OMAP_CMP,     __CEPH_OSD_OP(RD, DATA, 25),    "omap-cmp")         \
+                                                                           \
+       /* tiering */                                                       \
+       f(COPY_FROM,    __CEPH_OSD_OP(WR, DATA, 26),    "copy-from")        \
+       f(COPY_GET_CLASSIC, __CEPH_OSD_OP(RD, DATA, 27), "copy-get-classic") \
+       f(UNDIRTY,      __CEPH_OSD_OP(WR, DATA, 28),    "undirty")          \
+       f(ISDIRTY,      __CEPH_OSD_OP(RD, DATA, 29),    "isdirty")          \
+       f(COPY_GET,     __CEPH_OSD_OP(RD, DATA, 30),    "copy-get")         \
+       f(CACHE_FLUSH,  __CEPH_OSD_OP(CACHE, DATA, 31), "cache-flush")      \
+       f(CACHE_EVICT,  __CEPH_OSD_OP(CACHE, DATA, 32), "cache-evict")      \
+       f(CACHE_TRY_FLUSH, __CEPH_OSD_OP(CACHE, DATA, 33), "cache-try-flush") \
+                                                                           \
+       /* convert tmap to omap */                                          \
+       f(TMAP2OMAP,    __CEPH_OSD_OP(RMW, DATA, 34),   "tmap2omap")        \
+                                                                           \
+       /* hints */                                                         \
+       f(SETALLOCHINT, __CEPH_OSD_OP(WR, DATA, 35),    "set-alloc-hint")   \
+                                                                           \
+       /** multi **/                                                       \
+       f(CLONERANGE,   __CEPH_OSD_OP(WR, MULTI, 1),    "clonerange")       \
+       f(ASSERT_SRC_VERSION, __CEPH_OSD_OP(RD, MULTI, 2), "assert-src-version") \
+       f(SRC_CMPXATTR, __CEPH_OSD_OP(RD, MULTI, 3),    "src-cmpxattr")     \
+                                                                           \
+       /** attrs **/                                                       \
+       /* read */                                                          \
+       f(GETXATTR,     __CEPH_OSD_OP(RD, ATTR, 1),     "getxattr")         \
+       f(GETXATTRS,    __CEPH_OSD_OP(RD, ATTR, 2),     "getxattrs")        \
+       f(CMPXATTR,     __CEPH_OSD_OP(RD, ATTR, 3),     "cmpxattr")         \
+                                                                           \
+       /* write */                                                         \
+       f(SETXATTR,     __CEPH_OSD_OP(WR, ATTR, 1),     "setxattr")         \
+       f(SETXATTRS,    __CEPH_OSD_OP(WR, ATTR, 2),     "setxattrs")        \
+       f(RESETXATTRS,  __CEPH_OSD_OP(WR, ATTR, 3),     "resetxattrs")      \
+       f(RMXATTR,      __CEPH_OSD_OP(WR, ATTR, 4),     "rmxattr")          \
+                                                                           \
+       /** subop **/                                                       \
+       f(PULL,         __CEPH_OSD_OP1(SUB, 1),         "pull")             \
+       f(PUSH,         __CEPH_OSD_OP1(SUB, 2),         "push")             \
+       f(BALANCEREADS, __CEPH_OSD_OP1(SUB, 3),         "balance-reads")    \
+       f(UNBALANCEREADS, __CEPH_OSD_OP1(SUB, 4),       "unbalance-reads")  \
+       f(SCRUB,        __CEPH_OSD_OP1(SUB, 5),         "scrub")            \
+       f(SCRUB_RESERVE, __CEPH_OSD_OP1(SUB, 6),        "scrub-reserve")    \
+       f(SCRUB_UNRESERVE, __CEPH_OSD_OP1(SUB, 7),      "scrub-unreserve")  \
+       f(SCRUB_STOP,   __CEPH_OSD_OP1(SUB, 8),         "scrub-stop")       \
+       f(SCRUB_MAP,    __CEPH_OSD_OP1(SUB, 9),         "scrub-map")        \
+                                                                           \
+       /** lock **/                                                        \
+       f(WRLOCK,       __CEPH_OSD_OP(WR, LOCK, 1),     "wrlock")           \
+       f(WRUNLOCK,     __CEPH_OSD_OP(WR, LOCK, 2),     "wrunlock")         \
+       f(RDLOCK,       __CEPH_OSD_OP(WR, LOCK, 3),     "rdlock")           \
+       f(RDUNLOCK,     __CEPH_OSD_OP(WR, LOCK, 4),     "rdunlock")         \
+       f(UPLOCK,       __CEPH_OSD_OP(WR, LOCK, 5),     "uplock")           \
+       f(DNLOCK,       __CEPH_OSD_OP(WR, LOCK, 6),     "dnlock")           \
+                                                                           \
+       /** exec **/                                                        \
+       /* note: the RD bit here is wrong; see special-case below in helper */ \
+       f(CALL,         __CEPH_OSD_OP(RD, EXEC, 1),     "call")             \
+                                                                           \
+       /** pg **/                                                          \
+       f(PGLS,         __CEPH_OSD_OP(RD, PG, 1),       "pgls")             \
+       f(PGLS_FILTER,  __CEPH_OSD_OP(RD, PG, 2),       "pgls-filter")      \
+       f(PG_HITSET_LS, __CEPH_OSD_OP(RD, PG, 3),       "pg-hitset-ls")     \
+       f(PG_HITSET_GET, __CEPH_OSD_OP(RD, PG, 4),      "pg-hitset-get")
+
 enum {
-       /** data **/
-       /* read */
-       CEPH_OSD_OP_READ      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
-       CEPH_OSD_OP_STAT      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2,
-       CEPH_OSD_OP_MAPEXT    = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
-
-       /* fancy read */
-       CEPH_OSD_OP_MASKTRUNC   = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4,
-       CEPH_OSD_OP_SPARSE_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 5,
-
-       CEPH_OSD_OP_NOTIFY    = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 6,
-       CEPH_OSD_OP_NOTIFY_ACK = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 7,
-
-       /* versioning */
-       CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8,
-
-       /* write */
-       CEPH_OSD_OP_WRITE     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
-       CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2,
-       CEPH_OSD_OP_TRUNCATE  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 3,
-       CEPH_OSD_OP_ZERO      = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 4,
-       CEPH_OSD_OP_DELETE    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 5,
-
-       /* fancy write */
-       CEPH_OSD_OP_APPEND    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6,
-       CEPH_OSD_OP_STARTSYNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 7,
-       CEPH_OSD_OP_SETTRUNC  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 8,
-       CEPH_OSD_OP_TRIMTRUNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 9,
-
-       CEPH_OSD_OP_TMAPUP  = CEPH_OSD_OP_MODE_RMW | CEPH_OSD_OP_TYPE_DATA | 10,
-       CEPH_OSD_OP_TMAPPUT = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 11,
-       CEPH_OSD_OP_TMAPGET = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 12,
-
-       CEPH_OSD_OP_CREATE  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13,
-       CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14,
-
-       CEPH_OSD_OP_WATCH   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 15,
-
-       /* omap */
-       CEPH_OSD_OP_OMAPGETKEYS   = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 17,
-       CEPH_OSD_OP_OMAPGETVALS   = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 18,
-       CEPH_OSD_OP_OMAPGETHEADER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 19,
-       CEPH_OSD_OP_OMAPGETVALSBYKEYS  =
-         CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 20,
-       CEPH_OSD_OP_OMAPSETVALS   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 21,
-       CEPH_OSD_OP_OMAPSETHEADER = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 22,
-       CEPH_OSD_OP_OMAPCLEAR     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 23,
-       CEPH_OSD_OP_OMAPRMKEYS    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 24,
-       CEPH_OSD_OP_OMAP_CMP      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 25,
-
-       /* hints */
-       CEPH_OSD_OP_SETALLOCHINT = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 35,
-
-       /** multi **/
-       CEPH_OSD_OP_CLONERANGE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_MULTI | 1,
-       CEPH_OSD_OP_ASSERT_SRC_VERSION = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 2,
-       CEPH_OSD_OP_SRC_CMPXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_MULTI | 3,
-
-       /** attrs **/
-       /* read */
-       CEPH_OSD_OP_GETXATTR  = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
-       CEPH_OSD_OP_GETXATTRS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
-       CEPH_OSD_OP_CMPXATTR  = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 3,
-
-       /* write */
-       CEPH_OSD_OP_SETXATTR  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 1,
-       CEPH_OSD_OP_SETXATTRS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 2,
-       CEPH_OSD_OP_RESETXATTRS = CEPH_OSD_OP_MODE_WR|CEPH_OSD_OP_TYPE_ATTR | 3,
-       CEPH_OSD_OP_RMXATTR   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 4,
-
-       /** subop **/
-       CEPH_OSD_OP_PULL            = CEPH_OSD_OP_MODE_SUB | 1,
-       CEPH_OSD_OP_PUSH            = CEPH_OSD_OP_MODE_SUB | 2,
-       CEPH_OSD_OP_BALANCEREADS    = CEPH_OSD_OP_MODE_SUB | 3,
-       CEPH_OSD_OP_UNBALANCEREADS  = CEPH_OSD_OP_MODE_SUB | 4,
-       CEPH_OSD_OP_SCRUB           = CEPH_OSD_OP_MODE_SUB | 5,
-       CEPH_OSD_OP_SCRUB_RESERVE   = CEPH_OSD_OP_MODE_SUB | 6,
-       CEPH_OSD_OP_SCRUB_UNRESERVE = CEPH_OSD_OP_MODE_SUB | 7,
-       CEPH_OSD_OP_SCRUB_STOP      = CEPH_OSD_OP_MODE_SUB | 8,
-       CEPH_OSD_OP_SCRUB_MAP     = CEPH_OSD_OP_MODE_SUB | 9,
-
-       /** lock **/
-       CEPH_OSD_OP_WRLOCK    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1,
-       CEPH_OSD_OP_WRUNLOCK  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 2,
-       CEPH_OSD_OP_RDLOCK    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 3,
-       CEPH_OSD_OP_RDUNLOCK  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 4,
-       CEPH_OSD_OP_UPLOCK    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 5,
-       CEPH_OSD_OP_DNLOCK    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
-
-       /** exec **/
-       /* note: the RD bit here is wrong; see special-case below in helper */
-       CEPH_OSD_OP_CALL    = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_EXEC | 1,
-
-       /** pg **/
-       CEPH_OSD_OP_PGLS      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 1,
-       CEPH_OSD_OP_PGLS_FILTER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_PG | 2,
+#define GENERATE_ENUM_ENTRY(op, opcode, str)   CEPH_OSD_OP_##op = (opcode),
+__CEPH_FORALL_OSD_OPS(GENERATE_ENUM_ENTRY)
+#undef GENERATE_ENUM_ENTRY
 };
 
 static inline int ceph_osd_op_type_lock(int op)
index e50cc69..f8cceb9 100644 (file)
@@ -3,6 +3,7 @@ config CEPH_LIB
        depends on INET
        select LIBCRC32C
        select CRYPTO_AES
+       select CRYPTO_CBC
        select CRYPTO
        select KEYS
        default n
index 1675021..58fbfe1 100644 (file)
@@ -293,17 +293,20 @@ static int get_secret(struct ceph_crypto_key *dst, const char *name) {
                key_err = PTR_ERR(ukey);
                switch (key_err) {
                case -ENOKEY:
-                       pr_warning("ceph: Mount failed due to key not found: %s\n", name);
+                       pr_warn("ceph: Mount failed due to key not found: %s\n",
+                               name);
                        break;
                case -EKEYEXPIRED:
-                       pr_warning("ceph: Mount failed due to expired key: %s\n", name);
+                       pr_warn("ceph: Mount failed due to expired key: %s\n",
+                               name);
                        break;
                case -EKEYREVOKED:
-                       pr_warning("ceph: Mount failed due to revoked key: %s\n", name);
+                       pr_warn("ceph: Mount failed due to revoked key: %s\n",
+                               name);
                        break;
                default:
-                       pr_warning("ceph: Mount failed due to unknown key error"
-                              " %d: %s\n", key_err, name);
+                       pr_warn("ceph: Mount failed due to unknown key error %d: %s\n",
+                               key_err, name);
                }
                err = -EPERM;
                goto out;
@@ -433,7 +436,7 @@ ceph_parse_options(char *options, const char *dev_name,
 
                        /* misc */
                case Opt_osdtimeout:
-                       pr_warning("ignoring deprecated osdtimeout option\n");
+                       pr_warn("ignoring deprecated osdtimeout option\n");
                        break;
                case Opt_osdkeepalivetimeout:
                        opt->osd_keepalive_timeout = intval;
index 1348df9..3056020 100644 (file)
@@ -19,77 +19,12 @@ const char *ceph_entity_type_name(int type)
 const char *ceph_osd_op_name(int op)
 {
        switch (op) {
-       case CEPH_OSD_OP_READ: return "read";
-       case CEPH_OSD_OP_STAT: return "stat";
-       case CEPH_OSD_OP_MAPEXT: return "mapext";
-       case CEPH_OSD_OP_SPARSE_READ: return "sparse-read";
-       case CEPH_OSD_OP_NOTIFY: return "notify";
-       case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack";
-       case CEPH_OSD_OP_ASSERT_VER: return "assert-version";
-
-       case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
-
-       case CEPH_OSD_OP_CREATE: return "create";
-       case CEPH_OSD_OP_WRITE: return "write";
-       case CEPH_OSD_OP_DELETE: return "delete";
-       case CEPH_OSD_OP_TRUNCATE: return "truncate";
-       case CEPH_OSD_OP_ZERO: return "zero";
-       case CEPH_OSD_OP_WRITEFULL: return "writefull";
-       case CEPH_OSD_OP_ROLLBACK: return "rollback";
-
-       case CEPH_OSD_OP_APPEND: return "append";
-       case CEPH_OSD_OP_STARTSYNC: return "startsync";
-       case CEPH_OSD_OP_SETTRUNC: return "settrunc";
-       case CEPH_OSD_OP_TRIMTRUNC: return "trimtrunc";
-
-       case CEPH_OSD_OP_TMAPUP: return "tmapup";
-       case CEPH_OSD_OP_TMAPGET: return "tmapget";
-       case CEPH_OSD_OP_TMAPPUT: return "tmapput";
-       case CEPH_OSD_OP_WATCH: return "watch";
-
-       case CEPH_OSD_OP_CLONERANGE: return "clonerange";
-       case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version";
-       case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr";
-
-       case CEPH_OSD_OP_GETXATTR: return "getxattr";
-       case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
-       case CEPH_OSD_OP_SETXATTR: return "setxattr";
-       case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
-       case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
-       case CEPH_OSD_OP_RMXATTR: return "rmxattr";
-       case CEPH_OSD_OP_CMPXATTR: return "cmpxattr";
-
-       case CEPH_OSD_OP_PULL: return "pull";
-       case CEPH_OSD_OP_PUSH: return "push";
-       case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
-       case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
-       case CEPH_OSD_OP_SCRUB: return "scrub";
-       case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve";
-       case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve";
-       case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop";
-       case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map";
-
-       case CEPH_OSD_OP_WRLOCK: return "wrlock";
-       case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
-       case CEPH_OSD_OP_RDLOCK: return "rdlock";
-       case CEPH_OSD_OP_RDUNLOCK: return "rdunlock";
-       case CEPH_OSD_OP_UPLOCK: return "uplock";
-       case CEPH_OSD_OP_DNLOCK: return "dnlock";
-
-       case CEPH_OSD_OP_CALL: return "call";
-
-       case CEPH_OSD_OP_PGLS: return "pgls";
-       case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter";
-       case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys";
-       case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals";
-       case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header";
-       case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys";
-       case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals";
-       case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header";
-       case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear";
-       case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys";
+#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return (str);
+__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
+#undef GENERATE_CASE
+       default:
+               return "???";
        }
-       return "???";
 }
 
 const char *ceph_osd_state_name(int s)
index d1a62c6..d2d5255 100644 (file)
@@ -169,7 +169,8 @@ static int osdc_show(struct seq_file *s, void *pp)
 
                for (i = 0; i < req->r_num_ops; i++) {
                        opcode = req->r_ops[i].op;
-                       seq_printf(s, "\t%s", ceph_osd_op_name(opcode));
+                       seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
+                                  ceph_osd_op_name(opcode));
                }
 
                seq_printf(s, "\n");
index b2f571d..559c9f6 100644 (file)
@@ -292,7 +292,11 @@ int ceph_msgr_init(void)
        if (ceph_msgr_slab_init())
                return -ENOMEM;
 
-       ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0);
+       /*
+        * The number of active work items is limited by the number of
+        * connections, so leave @max_active at default.
+        */
+       ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0);
        if (ceph_msgr_wq)
                return 0;
 
@@ -1937,11 +1941,11 @@ static int process_banner(struct ceph_connection *con)
                   sizeof(con->peer_addr)) != 0 &&
            !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
              con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
-               pr_warning("wrong peer, want %s/%d, got %s/%d\n",
-                          ceph_pr_addr(&con->peer_addr.in_addr),
-                          (int)le32_to_cpu(con->peer_addr.nonce),
-                          ceph_pr_addr(&con->actual_peer_addr.in_addr),
-                          (int)le32_to_cpu(con->actual_peer_addr.nonce));
+               pr_warn("wrong peer, want %s/%d, got %s/%d\n",
+                       ceph_pr_addr(&con->peer_addr.in_addr),
+                       (int)le32_to_cpu(con->peer_addr.nonce),
+                       ceph_pr_addr(&con->actual_peer_addr.in_addr),
+                       (int)le32_to_cpu(con->actual_peer_addr.nonce));
                con->error_msg = "wrong peer at address";
                return -1;
        }
@@ -2302,7 +2306,7 @@ static int read_partial_message(struct ceph_connection *con)
 
                BUG_ON(!con->in_msg ^ skip);
                if (con->in_msg && data_len > con->in_msg->data_length) {
-                       pr_warning("%s skipping long message (%u > %zd)\n",
+                       pr_warn("%s skipping long message (%u > %zd)\n",
                                __func__, data_len, con->in_msg->data_length);
                        ceph_msg_put(con->in_msg);
                        con->in_msg = NULL;
@@ -2712,7 +2716,7 @@ static bool con_sock_closed(struct ceph_connection *con)
        CASE(OPEN);
        CASE(STANDBY);
        default:
-               pr_warning("%s con %p unrecognized state %lu\n",
+               pr_warn("%s con %p unrecognized state %lu\n",
                        __func__, con, con->state);
                con->error_msg = "unrecognized con state";
                BUG();
@@ -2828,8 +2832,8 @@ static void con_work(struct work_struct *work)
  */
 static void con_fault(struct ceph_connection *con)
 {
-       pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
-              ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
+       pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
+               ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
        dout("fault %p state %lu to peer %s\n",
             con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
 
@@ -3071,10 +3075,8 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
                return;
 
        WARN_ON(!list_empty(&data->links));
-       if (data->type == CEPH_MSG_DATA_PAGELIST) {
+       if (data->type == CEPH_MSG_DATA_PAGELIST)
                ceph_pagelist_release(data->pagelist);
-               kfree(data->pagelist);
-       }
        kmem_cache_free(ceph_msg_data_cache, data);
 }
 
index 61fcfc3..a83062c 100644 (file)
@@ -1182,10 +1182,10 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
                pr_info("alloc_msg unknown type %d\n", type);
                *skip = 1;
        } else if (front_len > m->front_alloc_len) {
-               pr_warning("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
-                          front_len, m->front_alloc_len,
-                          (unsigned int)con->peer_name.type,
-                          le64_to_cpu(con->peer_name.num));
+               pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
+                       front_len, m->front_alloc_len,
+                       (unsigned int)con->peer_name.type,
+                       le64_to_cpu(con->peer_name.num));
                ceph_msg_put(m);
                m = ceph_msg_new(type, front_len, GFP_NOFS, false);
        }
index 30f6faf..f3fc54e 100644 (file)
@@ -30,8 +30,11 @@ static void __send_queued(struct ceph_osd_client *osdc);
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 static void __register_request(struct ceph_osd_client *osdc,
                               struct ceph_osd_request *req);
+static void __unregister_request(struct ceph_osd_client *osdc,
+                                struct ceph_osd_request *req);
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
                                        struct ceph_osd_request *req);
+static void __enqueue_request(struct ceph_osd_request *req);
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req);
 
@@ -428,68 +431,9 @@ EXPORT_SYMBOL(ceph_osdc_alloc_request);
 static bool osd_req_opcode_valid(u16 opcode)
 {
        switch (opcode) {
-       case CEPH_OSD_OP_READ:
-       case CEPH_OSD_OP_STAT:
-       case CEPH_OSD_OP_MAPEXT:
-       case CEPH_OSD_OP_MASKTRUNC:
-       case CEPH_OSD_OP_SPARSE_READ:
-       case CEPH_OSD_OP_NOTIFY:
-       case CEPH_OSD_OP_NOTIFY_ACK:
-       case CEPH_OSD_OP_ASSERT_VER:
-       case CEPH_OSD_OP_WRITE:
-       case CEPH_OSD_OP_WRITEFULL:
-       case CEPH_OSD_OP_TRUNCATE:
-       case CEPH_OSD_OP_ZERO:
-       case CEPH_OSD_OP_DELETE:
-       case CEPH_OSD_OP_APPEND:
-       case CEPH_OSD_OP_STARTSYNC:
-       case CEPH_OSD_OP_SETTRUNC:
-       case CEPH_OSD_OP_TRIMTRUNC:
-       case CEPH_OSD_OP_TMAPUP:
-       case CEPH_OSD_OP_TMAPPUT:
-       case CEPH_OSD_OP_TMAPGET:
-       case CEPH_OSD_OP_CREATE:
-       case CEPH_OSD_OP_ROLLBACK:
-       case CEPH_OSD_OP_WATCH:
-       case CEPH_OSD_OP_OMAPGETKEYS:
-       case CEPH_OSD_OP_OMAPGETVALS:
-       case CEPH_OSD_OP_OMAPGETHEADER:
-       case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
-       case CEPH_OSD_OP_OMAPSETVALS:
-       case CEPH_OSD_OP_OMAPSETHEADER:
-       case CEPH_OSD_OP_OMAPCLEAR:
-       case CEPH_OSD_OP_OMAPRMKEYS:
-       case CEPH_OSD_OP_OMAP_CMP:
-       case CEPH_OSD_OP_SETALLOCHINT:
-       case CEPH_OSD_OP_CLONERANGE:
-       case CEPH_OSD_OP_ASSERT_SRC_VERSION:
-       case CEPH_OSD_OP_SRC_CMPXATTR:
-       case CEPH_OSD_OP_GETXATTR:
-       case CEPH_OSD_OP_GETXATTRS:
-       case CEPH_OSD_OP_CMPXATTR:
-       case CEPH_OSD_OP_SETXATTR:
-       case CEPH_OSD_OP_SETXATTRS:
-       case CEPH_OSD_OP_RESETXATTRS:
-       case CEPH_OSD_OP_RMXATTR:
-       case CEPH_OSD_OP_PULL:
-       case CEPH_OSD_OP_PUSH:
-       case CEPH_OSD_OP_BALANCEREADS:
-       case CEPH_OSD_OP_UNBALANCEREADS:
-       case CEPH_OSD_OP_SCRUB:
-       case CEPH_OSD_OP_SCRUB_RESERVE:
-       case CEPH_OSD_OP_SCRUB_UNRESERVE:
-       case CEPH_OSD_OP_SCRUB_STOP:
-       case CEPH_OSD_OP_SCRUB_MAP:
-       case CEPH_OSD_OP_WRLOCK:
-       case CEPH_OSD_OP_WRUNLOCK:
-       case CEPH_OSD_OP_RDLOCK:
-       case CEPH_OSD_OP_RDUNLOCK:
-       case CEPH_OSD_OP_UPLOCK:
-       case CEPH_OSD_OP_DNLOCK:
-       case CEPH_OSD_OP_CALL:
-       case CEPH_OSD_OP_PGLS:
-       case CEPH_OSD_OP_PGLS_FILTER:
-               return true;
+#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true;
+__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
+#undef GENERATE_CASE
        default:
                return false;
        }
@@ -892,6 +836,37 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
        return NULL;
 }
 
+static void __kick_linger_request(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+       struct ceph_osd *osd = req->r_osd;
+
+       /*
+        * Linger requests need to be resent with a new tid to avoid
+        * the dup op detection logic on the OSDs.  Achieve this with
+        * a re-register dance instead of open-coding.
+        */
+       ceph_osdc_get_request(req);
+       if (!list_empty(&req->r_linger_item))
+               __unregister_linger_request(osdc, req);
+       else
+               __unregister_request(osdc, req);
+       __register_request(osdc, req);
+       ceph_osdc_put_request(req);
+
+       /*
+        * Unless request has been registered as both normal and
+        * lingering, __unregister{,_linger}_request clears r_osd.
+        * However, here we need to preserve r_osd to make sure we
+        * requeue on the same OSD.
+        */
+       WARN_ON(req->r_osd || !osd);
+       req->r_osd = osd;
+
+       dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
+       __enqueue_request(req);
+}
+
 /*
  * Resubmit requests pending on the given osd.
  */
@@ -900,12 +875,14 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request *req, *nreq;
        LIST_HEAD(resend);
+       LIST_HEAD(resend_linger);
        int err;
 
-       dout("__kick_osd_requests osd%d\n", osd->o_osd);
+       dout("%s osd%d\n", __func__, osd->o_osd);
        err = __reset_osd(osdc, osd);
        if (err)
                return;
+
        /*
         * Build up a list of requests to resend by traversing the
         * osd's list of requests.  Requests for a given object are
@@ -926,33 +903,32 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
        list_for_each_entry(req, &osd->o_requests, r_osd_item) {
                if (!req->r_sent)
                        break;
-               list_move_tail(&req->r_req_lru_item, &resend);
-               dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
-                    osd->o_osd);
-               if (!req->r_linger)
+
+               if (!req->r_linger) {
+                       dout("%s requeueing %p tid %llu\n", __func__, req,
+                            req->r_tid);
+                       list_move_tail(&req->r_req_lru_item, &resend);
                        req->r_flags |= CEPH_OSD_FLAG_RETRY;
+               } else {
+                       list_move_tail(&req->r_req_lru_item, &resend_linger);
+               }
        }
        list_splice(&resend, &osdc->req_unsent);
 
        /*
-        * Linger requests are re-registered before sending, which
-        * sets up a new tid for each.  We add them to the unsent
-        * list at the end to keep things in tid order.
+        * Both registered and not yet registered linger requests are
+        * enqueued with a new tid on the same OSD.  We add/move them
+        * to req_unsent/o_requests at the end to keep things in tid
+        * order.
         */
        list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
                                 r_linger_osd_item) {
-               /*
-                * reregister request prior to unregistering linger so
-                * that r_osd is preserved.
-                */
-               BUG_ON(!list_empty(&req->r_req_lru_item));
-               __register_request(osdc, req);
-               list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
-               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-               __unregister_linger_request(osdc, req);
-               dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
-                    osd->o_osd);
+               WARN_ON(!list_empty(&req->r_req_lru_item));
+               __kick_linger_request(req);
        }
+
+       list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
+               __kick_linger_request(req);
 }
 
 /*
@@ -1346,6 +1322,22 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
                                   &req->r_target_oid, pg_out);
 }
 
+static void __enqueue_request(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+
+       dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
+            req->r_osd ? req->r_osd->o_osd : -1);
+
+       if (req->r_osd) {
+               __remove_osd_from_lru(req->r_osd);
+               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
+               list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
+       } else {
+               list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
+       }
+}
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
@@ -1423,13 +1415,7 @@ static int __map_request(struct ceph_osd_client *osdc,
                              &osdc->osdmap->osd_addr[o]);
        }
 
-       if (req->r_osd) {
-               __remove_osd_from_lru(req->r_osd);
-               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-               list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
-       } else {
-               list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
-       }
+       __enqueue_request(req);
        err = 1;   /* osd or pg changed */
 
 out:
@@ -1774,8 +1760,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        }
        bytes = le32_to_cpu(msg->hdr.data_len);
        if (payload_len != bytes) {
-               pr_warning("sum of op payload lens %d != data_len %d",
-                          payload_len, bytes);
+               pr_warn("sum of op payload lens %d != data_len %d\n",
+                       payload_len, bytes);
                goto bad_put;
        }
 
@@ -2313,24 +2299,19 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
        if (event) {
                event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
                if (!event_work) {
-                       dout("ERROR: could not allocate event_work\n");
-                       goto done_err;
+                       pr_err("couldn't allocate event_work\n");
+                       ceph_osdc_put_event(event);
+                       return;
                }
                INIT_WORK(&event_work->work, do_event_work);
                event_work->event = event;
                event_work->ver = ver;
                event_work->notify_id = notify_id;
                event_work->opcode = opcode;
-               if (!queue_work(osdc->notify_wq, &event_work->work)) {
-                       dout("WARNING: failed to queue notify event work\n");
-                       goto done_err;
-               }
-       }
 
-       return;
+               queue_work(osdc->notify_wq, &event_work->work);
+       }
 
-done_err:
-       ceph_osdc_put_event(event);
        return;
 
 bad:
@@ -2797,10 +2778,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        ceph_msg_revoke_incoming(req->r_reply);
 
        if (front_len > req->r_reply->front_alloc_len) {
-               pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
-                          front_len, req->r_reply->front_alloc_len,
-                          (unsigned int)con->peer_name.type,
-                          le64_to_cpu(con->peer_name.num));
+               pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n",
+                       front_len, req->r_reply->front_alloc_len,
+                       (unsigned int)con->peer_name.type,
+                       le64_to_cpu(con->peer_name.num));
                m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
                                 false);
                if (!m)
@@ -2823,8 +2804,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                        if (osd_data->pages &&
                                unlikely(osd_data->length < data_len)) {
 
-                               pr_warning("tid %lld reply has %d bytes "
-                                       "we had only %llu bytes ready\n",
+                               pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n",
                                        tid, data_len, osd_data->length);
                                *skip = 1;
                                ceph_msg_put(m);
index c547e46..b8c3fde 100644 (file)
@@ -521,11 +521,11 @@ static int decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi)
        ev = ceph_decode_8(p);  /* encoding version */
        cv = ceph_decode_8(p); /* compat version */
        if (ev < 5) {
-               pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
+               pr_warn("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv);
                return -EINVAL;
        }
        if (cv > 9) {
-               pr_warning("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
+               pr_warn("got v %d cv %d > 9 of ceph_pg_pool\n", ev, cv);
                return -EINVAL;
        }
        len = ceph_decode_32(p);
@@ -671,26 +671,26 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
        int i;
 
        state = krealloc(map->osd_state, max*sizeof(*state), GFP_NOFS);
+       if (!state)
+               return -ENOMEM;
+       map->osd_state = state;
+
        weight = krealloc(map->osd_weight, max*sizeof(*weight), GFP_NOFS);
-       addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
-       if (!state || !weight || !addr) {
-               kfree(state);
-               kfree(weight);
-               kfree(addr);
+       if (!weight)
+               return -ENOMEM;
+       map->osd_weight = weight;
 
+       addr = krealloc(map->osd_addr, max*sizeof(*addr), GFP_NOFS);
+       if (!addr)
                return -ENOMEM;
-       }
+       map->osd_addr = addr;
 
        for (i = map->max_osd; i < max; i++) {
-               state[i] = 0;
-               weight[i] = CEPH_OSD_OUT;
-               memset(addr + i, 0, sizeof(*addr));
+               map->osd_state[i] = 0;
+               map->osd_weight[i] = CEPH_OSD_OUT;
+               memset(map->osd_addr + i, 0, sizeof(*map->osd_addr));
        }
 
-       map->osd_state = state;
-       map->osd_weight = weight;
-       map->osd_addr = addr;
-
        if (map->osd_primary_affinity) {
                u32 *affinity;
 
@@ -698,11 +698,11 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
                                    max*sizeof(*affinity), GFP_NOFS);
                if (!affinity)
                        return -ENOMEM;
+               map->osd_primary_affinity = affinity;
 
                for (i = map->max_osd; i < max; i++)
-                       affinity[i] = CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
-
-               map->osd_primary_affinity = affinity;
+                       map->osd_primary_affinity[i] =
+                           CEPH_OSD_DEFAULT_PRIMARY_AFFINITY;
        }
 
        map->max_osd = max;
@@ -729,9 +729,9 @@ static int get_osdmap_client_data_v(void **p, void *end,
 
                ceph_decode_8_safe(p, end, struct_compat, e_inval);
                if (struct_compat > OSDMAP_WRAPPER_COMPAT_VER) {
-                       pr_warning("got v %d cv %d > %d of %s ceph_osdmap\n",
-                                  struct_v, struct_compat,
-                                  OSDMAP_WRAPPER_COMPAT_VER, prefix);
+                       pr_warn("got v %d cv %d > %d of %s ceph_osdmap\n",
+                               struct_v, struct_compat,
+                               OSDMAP_WRAPPER_COMPAT_VER, prefix);
                        return -EINVAL;
                }
                *p += 4; /* ignore wrapper struct_len */
@@ -739,9 +739,9 @@ static int get_osdmap_client_data_v(void **p, void *end,
                ceph_decode_8_safe(p, end, struct_v, e_inval);
                ceph_decode_8_safe(p, end, struct_compat, e_inval);
                if (struct_compat > OSDMAP_CLIENT_DATA_COMPAT_VER) {
-                       pr_warning("got v %d cv %d > %d of %s ceph_osdmap client data\n",
-                                  struct_v, struct_compat,
-                                  OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
+                       pr_warn("got v %d cv %d > %d of %s ceph_osdmap client data\n",
+                               struct_v, struct_compat,
+                               OSDMAP_CLIENT_DATA_COMPAT_VER, prefix);
                        return -EINVAL;
                }
                *p += 4; /* ignore client data struct_len */
@@ -751,8 +751,8 @@ static int get_osdmap_client_data_v(void **p, void *end,
                *p -= 1;
                ceph_decode_16_safe(p, end, version, e_inval);
                if (version < 6) {
-                       pr_warning("got v %d < 6 of %s ceph_osdmap\n", version,
-                                  prefix);
+                       pr_warn("got v %d < 6 of %s ceph_osdmap\n",
+                               version, prefix);
                        return -EINVAL;
                }
 
index 92866be..c7c220a 100644 (file)
@@ -1,5 +1,6 @@
 #include <linux/module.h>
 #include <linux/gfp.h>
+#include <linux/slab.h>
 #include <linux/pagemap.h>
 #include <linux/highmem.h>
 #include <linux/ceph/pagelist.h>
@@ -13,8 +14,10 @@ static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
        }
 }
 
-int ceph_pagelist_release(struct ceph_pagelist *pl)
+void ceph_pagelist_release(struct ceph_pagelist *pl)
 {
+       if (!atomic_dec_and_test(&pl->refcnt))
+               return;
        ceph_pagelist_unmap_tail(pl);
        while (!list_empty(&pl->head)) {
                struct page *page = list_first_entry(&pl->head, struct page,
@@ -23,7 +26,7 @@ int ceph_pagelist_release(struct ceph_pagelist *pl)
                __free_page(page);
        }
        ceph_pagelist_free_reserve(pl);
-       return 0;
+       kfree(pl);
 }
 EXPORT_SYMBOL(ceph_pagelist_release);