3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
50 #include "rbd_types.h"
52 #define RBD_DEBUG /* Activate rbd_assert() calls */
55 * Increment the given counter and return its updated value.
56 * If the counter is already 0 it will not be incremented.
57 * If the counter is already at its maximum value returns
58 * -EINVAL without updating it.
60 static int atomic_inc_return_safe(atomic_t *v)
64 counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65 if (counter <= (unsigned int)INT_MAX)
73 /* Decrement the counter. Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
78 counter = atomic_dec_return(v);
87 #define RBD_DRV_NAME "rbd"
89 #define RBD_MINORS_PER_MAJOR 256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
92 #define RBD_MAX_PARENT_CHAIN_LEN 16
94 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN \
96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
98 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
100 #define RBD_SNAP_HEAD_NAME "-"
102 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX 64
108 #define RBD_OBJ_PREFIX_LEN_MAX 64
110 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */
111 #define RBD_RETRY_DELAY msecs_to_jiffies(1000)
115 #define RBD_FEATURE_LAYERING (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118 #define RBD_FEATURE_OBJECT_MAP (1ULL<<3)
119 #define RBD_FEATURE_FAST_DIFF (1ULL<<4)
120 #define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
121 #define RBD_FEATURE_DATA_POOL (1ULL<<7)
122 #define RBD_FEATURE_OPERATIONS (1ULL<<8)
124 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
125 RBD_FEATURE_STRIPINGV2 | \
126 RBD_FEATURE_EXCLUSIVE_LOCK | \
127 RBD_FEATURE_OBJECT_MAP | \
128 RBD_FEATURE_FAST_DIFF | \
129 RBD_FEATURE_DEEP_FLATTEN | \
130 RBD_FEATURE_DATA_POOL | \
131 RBD_FEATURE_OPERATIONS)
133 /* Features supported by this (client software) implementation. */
135 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
138 * An RBD device name will be "rbd#", where the "rbd" comes from
139 * RBD_DRV_NAME above, and # is a unique integer identifier.
141 #define DEV_NAME_LEN 32
144 * block device image metadata (in-memory version)
146 struct rbd_image_header {
147 /* These six fields never change for a given rbd image */
153 u64 features; /* Might be changeable someday? */
155 /* The remaining fields need to be updated occasionally */
157 struct ceph_snap_context *snapc;
158 char *snap_names; /* format 1 only */
159 u64 *snap_sizes; /* format 1 only */
163 * An rbd image specification.
165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166 * identify an image. Each rbd_dev structure includes a pointer to
167 * an rbd_spec structure that encapsulates this identity.
169 * Each of the id's in an rbd_spec has an associated name. For a
170 * user-mapped image, the names are supplied and the id's associated
171 * with them are looked up. For a layered image, a parent image is
172 * defined by the tuple, and the names are looked up.
174 * An rbd_dev structure contains a parent_spec pointer which is
175 * non-null if the image it represents is a child in a layered
176 * image. This pointer will refer to the rbd_spec structure used
177 * by the parent rbd_dev for its own identity (i.e., the structure
178 * is shared between the parent and child).
180 * Since these structures are populated once, during the discovery
181 * phase of image construction, they are effectively immutable so
182 * we make no effort to synchronize access to them.
184 * Note that code herein does not assume the image name is known (it
185 * could be a null pointer).
189 const char *pool_name;
190 const char *pool_ns; /* NULL if default, never "" */
192 const char *image_id;
193 const char *image_name;
196 const char *snap_name;
202 * an instance of the client. multiple devices may share an rbd client.
205 struct ceph_client *client;
207 struct list_head node;
210 struct pending_result {
211 int result; /* first nonzero result */
215 struct rbd_img_request;
217 enum obj_request_type {
218 OBJ_REQUEST_NODATA = 1,
219 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
220 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
221 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
224 enum obj_operation_type {
231 #define RBD_OBJ_FLAG_DELETION (1U << 0)
232 #define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1)
233 #define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2)
234 #define RBD_OBJ_FLAG_MAY_EXIST (1U << 3)
235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4)
237 enum rbd_obj_read_state {
238 RBD_OBJ_READ_START = 1,
244 * Writes go through the following state machine to deal with
247 * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
250 * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . .
252 * . v v (deep-copyup .
253 * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) .
256 * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup .
259 * done . . . . . . . . . . . . . . . . . .
264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
265 * assert_exists guard is needed or not (in some cases it's not needed
266 * even if there is a parent).
268 enum rbd_obj_write_state {
269 RBD_OBJ_WRITE_START = 1,
270 RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271 RBD_OBJ_WRITE_OBJECT,
272 __RBD_OBJ_WRITE_COPYUP,
273 RBD_OBJ_WRITE_COPYUP,
274 RBD_OBJ_WRITE_POST_OBJECT_MAP,
277 enum rbd_obj_copyup_state {
278 RBD_OBJ_COPYUP_START = 1,
279 RBD_OBJ_COPYUP_READ_PARENT,
280 __RBD_OBJ_COPYUP_OBJECT_MAPS,
281 RBD_OBJ_COPYUP_OBJECT_MAPS,
282 __RBD_OBJ_COPYUP_WRITE_OBJECT,
283 RBD_OBJ_COPYUP_WRITE_OBJECT,
286 struct rbd_obj_request {
287 struct ceph_object_extent ex;
288 unsigned int flags; /* RBD_OBJ_FLAG_* */
290 enum rbd_obj_read_state read_state; /* for reads */
291 enum rbd_obj_write_state write_state; /* for writes */
294 struct rbd_img_request *img_request;
295 struct ceph_file_extent *img_extents;
299 struct ceph_bio_iter bio_pos;
301 struct ceph_bvec_iter bvec_pos;
307 enum rbd_obj_copyup_state copyup_state;
308 struct bio_vec *copyup_bvecs;
309 u32 copyup_bvec_count;
311 struct list_head osd_reqs; /* w/ r_private_item */
313 struct mutex state_mutex;
314 struct pending_result pending;
319 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
320 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
325 RBD_IMG_EXCLUSIVE_LOCK,
326 __RBD_IMG_OBJECT_REQUESTS,
327 RBD_IMG_OBJECT_REQUESTS,
330 struct rbd_img_request {
331 struct rbd_device *rbd_dev;
332 enum obj_operation_type op_type;
333 enum obj_request_type data_type;
335 enum rbd_img_state state;
337 u64 snap_id; /* for reads */
338 struct ceph_snap_context *snapc; /* for writes */
341 struct request *rq; /* block request */
342 struct rbd_obj_request *obj_request; /* obj req initiator */
345 struct list_head lock_item;
346 struct list_head object_extents; /* obj_req.ex structs */
348 struct mutex state_mutex;
349 struct pending_result pending;
350 struct work_struct work;
355 #define for_each_obj_request(ireq, oreq) \
356 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
357 #define for_each_obj_request_safe(ireq, oreq, n) \
358 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
360 enum rbd_watch_state {
361 RBD_WATCH_STATE_UNREGISTERED,
362 RBD_WATCH_STATE_REGISTERED,
363 RBD_WATCH_STATE_ERROR,
366 enum rbd_lock_state {
367 RBD_LOCK_STATE_UNLOCKED,
368 RBD_LOCK_STATE_LOCKED,
369 RBD_LOCK_STATE_RELEASING,
372 /* WatchNotify::ClientId */
373 struct rbd_client_id {
387 int dev_id; /* blkdev unique id */
389 int major; /* blkdev assigned major */
391 struct gendisk *disk; /* blkdev's gendisk and rq */
393 u32 image_format; /* Either 1 or 2 */
394 struct rbd_client *rbd_client;
396 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
398 spinlock_t lock; /* queue, flags, open_count */
400 struct rbd_image_header header;
401 unsigned long flags; /* possibly lock protected */
402 struct rbd_spec *spec;
403 struct rbd_options *opts;
404 char *config_info; /* add{,_single_major} string */
406 struct ceph_object_id header_oid;
407 struct ceph_object_locator header_oloc;
409 struct ceph_file_layout layout; /* used for all rbd requests */
411 struct mutex watch_mutex;
412 enum rbd_watch_state watch_state;
413 struct ceph_osd_linger_request *watch_handle;
415 struct delayed_work watch_dwork;
417 struct rw_semaphore lock_rwsem;
418 enum rbd_lock_state lock_state;
419 char lock_cookie[32];
420 struct rbd_client_id owner_cid;
421 struct work_struct acquired_lock_work;
422 struct work_struct released_lock_work;
423 struct delayed_work lock_dwork;
424 struct work_struct unlock_work;
425 spinlock_t lock_lists_lock;
426 struct list_head acquiring_list;
427 struct list_head running_list;
428 struct completion acquire_wait;
430 struct completion releasing_wait;
432 spinlock_t object_map_lock;
434 u64 object_map_size; /* in objects */
435 u64 object_map_flags;
437 struct workqueue_struct *task_wq;
439 struct rbd_spec *parent_spec;
442 struct rbd_device *parent;
444 /* Block layer tags. */
445 struct blk_mq_tag_set tag_set;
447 /* protects updating the header */
448 struct rw_semaphore header_rwsem;
450 struct rbd_mapping mapping;
452 struct list_head node;
456 unsigned long open_count; /* protected by lock */
460 * Flag bits for rbd_dev->flags:
461 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
465 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
466 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
469 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
471 static LIST_HEAD(rbd_dev_list); /* devices */
472 static DEFINE_SPINLOCK(rbd_dev_list_lock);
474 static LIST_HEAD(rbd_client_list); /* clients */
475 static DEFINE_SPINLOCK(rbd_client_list_lock);
477 /* Slab caches for frequently-allocated structures */
479 static struct kmem_cache *rbd_img_request_cache;
480 static struct kmem_cache *rbd_obj_request_cache;
482 static int rbd_major;
483 static DEFINE_IDA(rbd_dev_id_ida);
485 static struct workqueue_struct *rbd_wq;
487 static struct ceph_snap_context rbd_empty_snapc = {
488 .nref = REFCOUNT_INIT(1),
492 * single-major requires >= 0.75 version of userspace rbd utility.
494 static bool single_major = true;
495 module_param(single_major, bool, 0444);
496 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
498 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
499 static ssize_t remove_store(struct bus_type *bus, const char *buf,
501 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
503 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
505 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
507 static int rbd_dev_id_to_minor(int dev_id)
509 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
512 static int minor_to_rbd_dev_id(int minor)
514 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
517 static bool rbd_is_snap(struct rbd_device *rbd_dev)
519 return rbd_dev->spec->snap_id != CEPH_NOSNAP;
522 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
524 lockdep_assert_held(&rbd_dev->lock_rwsem);
526 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
527 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
530 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
534 down_read(&rbd_dev->lock_rwsem);
535 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
536 up_read(&rbd_dev->lock_rwsem);
537 return is_lock_owner;
540 static ssize_t supported_features_show(struct bus_type *bus, char *buf)
542 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
545 static BUS_ATTR_WO(add);
546 static BUS_ATTR_WO(remove);
547 static BUS_ATTR_WO(add_single_major);
548 static BUS_ATTR_WO(remove_single_major);
549 static BUS_ATTR_RO(supported_features);
551 static struct attribute *rbd_bus_attrs[] = {
553 &bus_attr_remove.attr,
554 &bus_attr_add_single_major.attr,
555 &bus_attr_remove_single_major.attr,
556 &bus_attr_supported_features.attr,
560 static umode_t rbd_bus_is_visible(struct kobject *kobj,
561 struct attribute *attr, int index)
564 (attr == &bus_attr_add_single_major.attr ||
565 attr == &bus_attr_remove_single_major.attr))
571 static const struct attribute_group rbd_bus_group = {
572 .attrs = rbd_bus_attrs,
573 .is_visible = rbd_bus_is_visible,
575 __ATTRIBUTE_GROUPS(rbd_bus);
577 static struct bus_type rbd_bus_type = {
579 .bus_groups = rbd_bus_groups,
582 static void rbd_root_dev_release(struct device *dev)
586 static struct device rbd_root_dev = {
588 .release = rbd_root_dev_release,
591 static __printf(2, 3)
592 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
594 struct va_format vaf;
602 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
603 else if (rbd_dev->disk)
604 printk(KERN_WARNING "%s: %s: %pV\n",
605 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
606 else if (rbd_dev->spec && rbd_dev->spec->image_name)
607 printk(KERN_WARNING "%s: image %s: %pV\n",
608 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
609 else if (rbd_dev->spec && rbd_dev->spec->image_id)
610 printk(KERN_WARNING "%s: id %s: %pV\n",
611 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
613 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
614 RBD_DRV_NAME, rbd_dev, &vaf);
619 #define rbd_assert(expr) \
620 if (unlikely(!(expr))) { \
621 printk(KERN_ERR "\nAssertion failure in %s() " \
623 "\trbd_assert(%s);\n\n", \
624 __func__, __LINE__, #expr); \
627 #else /* !RBD_DEBUG */
628 # define rbd_assert(expr) ((void) 0)
629 #endif /* !RBD_DEBUG */
631 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
633 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
634 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
635 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
636 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
637 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
639 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
640 u8 *order, u64 *snap_size);
641 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
643 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
645 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
646 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
649 * Return true if nothing else is pending.
651 static bool pending_result_dec(struct pending_result *pending, int *result)
653 rbd_assert(pending->num_pending > 0);
655 if (*result && !pending->result)
656 pending->result = *result;
657 if (--pending->num_pending)
660 *result = pending->result;
664 static int rbd_open(struct block_device *bdev, fmode_t mode)
666 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
667 bool removing = false;
669 spin_lock_irq(&rbd_dev->lock);
670 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
673 rbd_dev->open_count++;
674 spin_unlock_irq(&rbd_dev->lock);
678 (void) get_device(&rbd_dev->dev);
683 static void rbd_release(struct gendisk *disk, fmode_t mode)
685 struct rbd_device *rbd_dev = disk->private_data;
686 unsigned long open_count_before;
688 spin_lock_irq(&rbd_dev->lock);
689 open_count_before = rbd_dev->open_count--;
690 spin_unlock_irq(&rbd_dev->lock);
691 rbd_assert(open_count_before > 0);
693 put_device(&rbd_dev->dev);
696 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
700 if (get_user(ro, (int __user *)arg))
703 /* Snapshots can't be marked read-write */
704 if (rbd_is_snap(rbd_dev) && !ro)
707 /* Let blkdev_roset() handle it */
711 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
712 unsigned int cmd, unsigned long arg)
714 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
719 ret = rbd_ioctl_set_ro(rbd_dev, arg);
729 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
730 unsigned int cmd, unsigned long arg)
732 return rbd_ioctl(bdev, mode, cmd, arg);
734 #endif /* CONFIG_COMPAT */
736 static const struct block_device_operations rbd_bd_ops = {
737 .owner = THIS_MODULE,
739 .release = rbd_release,
742 .compat_ioctl = rbd_compat_ioctl,
747 * Initialize an rbd client instance. Success or not, this function
748 * consumes ceph_opts. Caller holds client_mutex.
750 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
752 struct rbd_client *rbdc;
755 dout("%s:\n", __func__);
756 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
760 kref_init(&rbdc->kref);
761 INIT_LIST_HEAD(&rbdc->node);
763 rbdc->client = ceph_create_client(ceph_opts, rbdc);
764 if (IS_ERR(rbdc->client))
766 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
768 ret = ceph_open_session(rbdc->client);
772 spin_lock(&rbd_client_list_lock);
773 list_add_tail(&rbdc->node, &rbd_client_list);
774 spin_unlock(&rbd_client_list_lock);
776 dout("%s: rbdc %p\n", __func__, rbdc);
780 ceph_destroy_client(rbdc->client);
785 ceph_destroy_options(ceph_opts);
786 dout("%s: error %d\n", __func__, ret);
791 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
793 kref_get(&rbdc->kref);
799 * Find a ceph client with specific addr and configuration. If
800 * found, bump its reference count.
802 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
804 struct rbd_client *client_node;
807 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
810 spin_lock(&rbd_client_list_lock);
811 list_for_each_entry(client_node, &rbd_client_list, node) {
812 if (!ceph_compare_options(ceph_opts, client_node->client)) {
813 __rbd_get_client(client_node);
819 spin_unlock(&rbd_client_list_lock);
821 return found ? client_node : NULL;
825 * (Per device) rbd map options
835 /* string args above */
844 static match_table_t rbd_opts_tokens = {
845 {Opt_queue_depth, "queue_depth=%d"},
846 {Opt_alloc_size, "alloc_size=%d"},
847 {Opt_lock_timeout, "lock_timeout=%d"},
849 {Opt_pool_ns, "_pool_ns=%s"},
850 /* string args above */
851 {Opt_read_only, "read_only"},
852 {Opt_read_only, "ro"}, /* Alternate spelling */
853 {Opt_read_write, "read_write"},
854 {Opt_read_write, "rw"}, /* Alternate spelling */
855 {Opt_lock_on_read, "lock_on_read"},
856 {Opt_exclusive, "exclusive"},
857 {Opt_notrim, "notrim"},
864 unsigned long lock_timeout;
871 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
872 #define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
873 #define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */
874 #define RBD_READ_ONLY_DEFAULT false
875 #define RBD_LOCK_ON_READ_DEFAULT false
876 #define RBD_EXCLUSIVE_DEFAULT false
877 #define RBD_TRIM_DEFAULT true
879 struct parse_rbd_opts_ctx {
880 struct rbd_spec *spec;
881 struct rbd_options *opts;
884 static int parse_rbd_opts_token(char *c, void *private)
886 struct parse_rbd_opts_ctx *pctx = private;
887 substring_t argstr[MAX_OPT_ARGS];
888 int token, intval, ret;
890 token = match_token(c, rbd_opts_tokens, argstr);
891 if (token < Opt_last_int) {
892 ret = match_int(&argstr[0], &intval);
894 pr_err("bad option arg (not int) at '%s'\n", c);
897 dout("got int token %d val %d\n", token, intval);
898 } else if (token > Opt_last_int && token < Opt_last_string) {
899 dout("got string token %d val %s\n", token, argstr[0].from);
901 dout("got token %d\n", token);
905 case Opt_queue_depth:
907 pr_err("queue_depth out of range\n");
910 pctx->opts->queue_depth = intval;
913 if (intval < SECTOR_SIZE) {
914 pr_err("alloc_size out of range\n");
917 if (!is_power_of_2(intval)) {
918 pr_err("alloc_size must be a power of 2\n");
921 pctx->opts->alloc_size = intval;
923 case Opt_lock_timeout:
924 /* 0 is "wait forever" (i.e. infinite timeout) */
925 if (intval < 0 || intval > INT_MAX / 1000) {
926 pr_err("lock_timeout out of range\n");
929 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
932 kfree(pctx->spec->pool_ns);
933 pctx->spec->pool_ns = match_strdup(argstr);
934 if (!pctx->spec->pool_ns)
938 pctx->opts->read_only = true;
941 pctx->opts->read_only = false;
943 case Opt_lock_on_read:
944 pctx->opts->lock_on_read = true;
947 pctx->opts->exclusive = true;
950 pctx->opts->trim = false;
953 /* libceph prints "bad option" msg */
960 static char* obj_op_name(enum obj_operation_type op_type)
977 * Destroy ceph client
979 * Caller must hold rbd_client_list_lock.
981 static void rbd_client_release(struct kref *kref)
983 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
985 dout("%s: rbdc %p\n", __func__, rbdc);
986 spin_lock(&rbd_client_list_lock);
987 list_del(&rbdc->node);
988 spin_unlock(&rbd_client_list_lock);
990 ceph_destroy_client(rbdc->client);
995 * Drop reference to ceph client node. If it's not referenced anymore, release
998 static void rbd_put_client(struct rbd_client *rbdc)
1001 kref_put(&rbdc->kref, rbd_client_release);
1005 * Get a ceph client with specific addr and configuration, if one does
1006 * not exist create it. Either way, ceph_opts is consumed by this
1009 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
1011 struct rbd_client *rbdc;
1014 mutex_lock(&client_mutex);
1015 rbdc = rbd_client_find(ceph_opts);
1017 ceph_destroy_options(ceph_opts);
1020 * Using an existing client. Make sure ->pg_pools is up to
1021 * date before we look up the pool id in do_rbd_add().
1023 ret = ceph_wait_for_latest_osdmap(rbdc->client,
1024 rbdc->client->options->mount_timeout);
1026 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
1027 rbd_put_client(rbdc);
1028 rbdc = ERR_PTR(ret);
1031 rbdc = rbd_client_create(ceph_opts);
1033 mutex_unlock(&client_mutex);
1038 static bool rbd_image_format_valid(u32 image_format)
1040 return image_format == 1 || image_format == 2;
1043 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
1048 /* The header has to start with the magic rbd header text */
1049 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
1052 /* The bio layer requires at least sector-sized I/O */
1054 if (ondisk->options.order < SECTOR_SHIFT)
1057 /* If we use u64 in a few spots we may be able to loosen this */
1059 if (ondisk->options.order > 8 * sizeof (int) - 1)
1063 * The size of a snapshot header has to fit in a size_t, and
1064 * that limits the number of snapshots.
1066 snap_count = le32_to_cpu(ondisk->snap_count);
1067 size = SIZE_MAX - sizeof (struct ceph_snap_context);
1068 if (snap_count > size / sizeof (__le64))
1072 * Not only that, but the size of the entire the snapshot
1073 * header must also be representable in a size_t.
1075 size -= snap_count * sizeof (__le64);
1076 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
1083 * returns the size of an object in the image
1085 static u32 rbd_obj_bytes(struct rbd_image_header *header)
1087 return 1U << header->obj_order;
1090 static void rbd_init_layout(struct rbd_device *rbd_dev)
1092 if (rbd_dev->header.stripe_unit == 0 ||
1093 rbd_dev->header.stripe_count == 0) {
1094 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1095 rbd_dev->header.stripe_count = 1;
1098 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1099 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1100 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1101 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1102 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1103 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1107 * Fill an rbd image header with information from the given format 1
1110 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1111 struct rbd_image_header_ondisk *ondisk)
1113 struct rbd_image_header *header = &rbd_dev->header;
1114 bool first_time = header->object_prefix == NULL;
1115 struct ceph_snap_context *snapc;
1116 char *object_prefix = NULL;
1117 char *snap_names = NULL;
1118 u64 *snap_sizes = NULL;
1123 /* Allocate this now to avoid having to handle failure below */
1126 object_prefix = kstrndup(ondisk->object_prefix,
1127 sizeof(ondisk->object_prefix),
1133 /* Allocate the snapshot context and fill it in */
1135 snap_count = le32_to_cpu(ondisk->snap_count);
1136 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1139 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1141 struct rbd_image_snap_ondisk *snaps;
1142 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1144 /* We'll keep a copy of the snapshot names... */
1146 if (snap_names_len > (u64)SIZE_MAX)
1148 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1152 /* ...as well as the array of their sizes. */
1153 snap_sizes = kmalloc_array(snap_count,
1154 sizeof(*header->snap_sizes),
1160 * Copy the names, and fill in each snapshot's id
1163 * Note that rbd_dev_v1_header_info() guarantees the
1164 * ondisk buffer we're working with has
1165 * snap_names_len bytes beyond the end of the
1166 * snapshot id array, this memcpy() is safe.
1168 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1169 snaps = ondisk->snaps;
1170 for (i = 0; i < snap_count; i++) {
1171 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1172 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1176 /* We won't fail any more, fill in the header */
1179 header->object_prefix = object_prefix;
1180 header->obj_order = ondisk->options.order;
1181 rbd_init_layout(rbd_dev);
1183 ceph_put_snap_context(header->snapc);
1184 kfree(header->snap_names);
1185 kfree(header->snap_sizes);
1188 /* The remaining fields always get updated (when we refresh) */
1190 header->image_size = le64_to_cpu(ondisk->image_size);
1191 header->snapc = snapc;
1192 header->snap_names = snap_names;
1193 header->snap_sizes = snap_sizes;
1201 ceph_put_snap_context(snapc);
1202 kfree(object_prefix);
1207 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1209 const char *snap_name;
1211 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1213 /* Skip over names until we find the one we are looking for */
1215 snap_name = rbd_dev->header.snap_names;
1217 snap_name += strlen(snap_name) + 1;
1219 return kstrdup(snap_name, GFP_KERNEL);
1223 * Snapshot id comparison function for use with qsort()/bsearch().
1224 * Note that result is for snapshots in *descending* order.
1226 static int snapid_compare_reverse(const void *s1, const void *s2)
1228 u64 snap_id1 = *(u64 *)s1;
1229 u64 snap_id2 = *(u64 *)s2;
1231 if (snap_id1 < snap_id2)
1233 return snap_id1 == snap_id2 ? 0 : -1;
1237 * Search a snapshot context to see if the given snapshot id is
1240 * Returns the position of the snapshot id in the array if it's found,
1241 * or BAD_SNAP_INDEX otherwise.
1243 * Note: The snapshot array is in kept sorted (by the osd) in
1244 * reverse order, highest snapshot id first.
1246 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1248 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1251 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1252 sizeof (snap_id), snapid_compare_reverse);
1254 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1257 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1261 const char *snap_name;
1263 which = rbd_dev_snap_index(rbd_dev, snap_id);
1264 if (which == BAD_SNAP_INDEX)
1265 return ERR_PTR(-ENOENT);
1267 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1268 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1271 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1273 if (snap_id == CEPH_NOSNAP)
1274 return RBD_SNAP_HEAD_NAME;
1276 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1277 if (rbd_dev->image_format == 1)
1278 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1280 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1283 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1286 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1287 if (snap_id == CEPH_NOSNAP) {
1288 *snap_size = rbd_dev->header.image_size;
1289 } else if (rbd_dev->image_format == 1) {
1292 which = rbd_dev_snap_index(rbd_dev, snap_id);
1293 if (which == BAD_SNAP_INDEX)
1296 *snap_size = rbd_dev->header.snap_sizes[which];
1301 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1310 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1313 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1314 if (snap_id == CEPH_NOSNAP) {
1315 *snap_features = rbd_dev->header.features;
1316 } else if (rbd_dev->image_format == 1) {
1317 *snap_features = 0; /* No features for format 1 */
1322 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1326 *snap_features = features;
1331 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1333 u64 snap_id = rbd_dev->spec->snap_id;
1338 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1341 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1345 rbd_dev->mapping.size = size;
1346 rbd_dev->mapping.features = features;
1351 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1353 rbd_dev->mapping.size = 0;
1354 rbd_dev->mapping.features = 0;
1357 static void zero_bvec(struct bio_vec *bv)
1360 unsigned long flags;
1362 buf = bvec_kmap_irq(bv, &flags);
1363 memset(buf, 0, bv->bv_len);
1364 flush_dcache_page(bv->bv_page);
1365 bvec_kunmap_irq(buf, &flags);
1368 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1370 struct ceph_bio_iter it = *bio_pos;
1372 ceph_bio_iter_advance(&it, off);
1373 ceph_bio_iter_advance_step(&it, bytes, ({
1378 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1380 struct ceph_bvec_iter it = *bvec_pos;
1382 ceph_bvec_iter_advance(&it, off);
1383 ceph_bvec_iter_advance_step(&it, bytes, ({
1389 * Zero a range in @obj_req data buffer defined by a bio (list) or
1390 * (private) bio_vec array.
1392 * @off is relative to the start of the data buffer.
1394 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1397 dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1399 switch (obj_req->img_request->data_type) {
1400 case OBJ_REQUEST_BIO:
1401 zero_bios(&obj_req->bio_pos, off, bytes);
1403 case OBJ_REQUEST_BVECS:
1404 case OBJ_REQUEST_OWN_BVECS:
1405 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1412 static void rbd_obj_request_destroy(struct kref *kref);
1413 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1415 rbd_assert(obj_request != NULL);
1416 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1417 kref_read(&obj_request->kref));
1418 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1421 static void rbd_img_request_destroy(struct kref *kref);
1422 static void rbd_img_request_put(struct rbd_img_request *img_request)
1424 rbd_assert(img_request != NULL);
1425 dout("%s: img %p (was %d)\n", __func__, img_request,
1426 kref_read(&img_request->kref));
1427 kref_put(&img_request->kref, rbd_img_request_destroy);
1430 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1431 struct rbd_obj_request *obj_request)
1433 rbd_assert(obj_request->img_request == NULL);
1435 /* Image request now owns object's original reference */
1436 obj_request->img_request = img_request;
1437 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1440 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1441 struct rbd_obj_request *obj_request)
1443 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1444 list_del(&obj_request->ex.oe_item);
1445 rbd_assert(obj_request->img_request == img_request);
1446 rbd_obj_request_put(obj_request);
1449 static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1451 struct rbd_obj_request *obj_req = osd_req->r_priv;
1453 dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1454 __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1455 obj_req->ex.oe_off, obj_req->ex.oe_len);
1456 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1460 * The default/initial value for all image request flags is 0. Each
1461 * is conditionally set to 1 at image request initialization time
1462 * and currently never change thereafter.
1464 static void img_request_layered_set(struct rbd_img_request *img_request)
1466 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1470 static void img_request_layered_clear(struct rbd_img_request *img_request)
1472 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1476 static bool img_request_layered_test(struct rbd_img_request *img_request)
1479 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1482 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1484 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1486 return !obj_req->ex.oe_off &&
1487 obj_req->ex.oe_len == rbd_dev->layout.object_size;
1490 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1492 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1494 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1495 rbd_dev->layout.object_size;
1499 * Must be called after rbd_obj_calc_img_extents().
1501 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1503 if (!obj_req->num_img_extents ||
1504 (rbd_obj_is_entire(obj_req) &&
1505 !obj_req->img_request->snapc->num_snaps))
1511 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1513 return ceph_file_extents_bytes(obj_req->img_extents,
1514 obj_req->num_img_extents);
1517 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1519 switch (img_req->op_type) {
1523 case OBJ_OP_DISCARD:
1524 case OBJ_OP_ZEROOUT:
1531 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1533 struct rbd_obj_request *obj_req = osd_req->r_priv;
1536 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1537 osd_req->r_result, obj_req);
1540 * Writes aren't allowed to return a data payload. In some
1541 * guarded write cases (e.g. stat + zero on an empty object)
1542 * a stat response makes it through, but we don't care.
1544 if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1547 result = osd_req->r_result;
1549 rbd_obj_handle_request(obj_req, result);
1552 static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1554 struct rbd_obj_request *obj_request = osd_req->r_priv;
1556 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1557 osd_req->r_snapid = obj_request->img_request->snap_id;
1560 static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1562 struct rbd_obj_request *obj_request = osd_req->r_priv;
1564 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1565 ktime_get_real_ts64(&osd_req->r_mtime);
1566 osd_req->r_data_offset = obj_request->ex.oe_off;
1569 static struct ceph_osd_request *
1570 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1571 struct ceph_snap_context *snapc, int num_ops)
1573 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1574 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1575 struct ceph_osd_request *req;
1576 const char *name_format = rbd_dev->image_format == 1 ?
1577 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1580 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1582 return ERR_PTR(-ENOMEM);
1584 list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1585 req->r_callback = rbd_osd_req_callback;
1586 req->r_priv = obj_req;
1589 * Data objects may be stored in a separate pool, but always in
1590 * the same namespace in that pool as the header in its pool.
1592 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1593 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1595 ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1596 rbd_dev->header.object_prefix,
1597 obj_req->ex.oe_objno);
1599 return ERR_PTR(ret);
1604 static struct ceph_osd_request *
1605 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1607 return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1611 static struct rbd_obj_request *rbd_obj_request_create(void)
1613 struct rbd_obj_request *obj_request;
1615 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1619 ceph_object_extent_init(&obj_request->ex);
1620 INIT_LIST_HEAD(&obj_request->osd_reqs);
1621 mutex_init(&obj_request->state_mutex);
1622 kref_init(&obj_request->kref);
1624 dout("%s %p\n", __func__, obj_request);
1628 static void rbd_obj_request_destroy(struct kref *kref)
1630 struct rbd_obj_request *obj_request;
1631 struct ceph_osd_request *osd_req;
1634 obj_request = container_of(kref, struct rbd_obj_request, kref);
1636 dout("%s: obj %p\n", __func__, obj_request);
1638 while (!list_empty(&obj_request->osd_reqs)) {
1639 osd_req = list_first_entry(&obj_request->osd_reqs,
1640 struct ceph_osd_request, r_private_item);
1641 list_del_init(&osd_req->r_private_item);
1642 ceph_osdc_put_request(osd_req);
1645 switch (obj_request->img_request->data_type) {
1646 case OBJ_REQUEST_NODATA:
1647 case OBJ_REQUEST_BIO:
1648 case OBJ_REQUEST_BVECS:
1649 break; /* Nothing to do */
1650 case OBJ_REQUEST_OWN_BVECS:
1651 kfree(obj_request->bvec_pos.bvecs);
1657 kfree(obj_request->img_extents);
1658 if (obj_request->copyup_bvecs) {
1659 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1660 if (obj_request->copyup_bvecs[i].bv_page)
1661 __free_page(obj_request->copyup_bvecs[i].bv_page);
1663 kfree(obj_request->copyup_bvecs);
1666 kmem_cache_free(rbd_obj_request_cache, obj_request);
1669 /* It's OK to call this for a device with no parent */
1671 static void rbd_spec_put(struct rbd_spec *spec);
1672 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1674 rbd_dev_remove_parent(rbd_dev);
1675 rbd_spec_put(rbd_dev->parent_spec);
1676 rbd_dev->parent_spec = NULL;
1677 rbd_dev->parent_overlap = 0;
1681 * Parent image reference counting is used to determine when an
1682 * image's parent fields can be safely torn down--after there are no
1683 * more in-flight requests to the parent image. When the last
1684 * reference is dropped, cleaning them up is safe.
1686 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1690 if (!rbd_dev->parent_spec)
1693 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1697 /* Last reference; clean up parent data structures */
1700 rbd_dev_unparent(rbd_dev);
1702 rbd_warn(rbd_dev, "parent reference underflow");
1706 * If an image has a non-zero parent overlap, get a reference to its
1709 * Returns true if the rbd device has a parent with a non-zero
1710 * overlap and a reference for it was successfully taken, or
1713 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1717 if (!rbd_dev->parent_spec)
1720 down_read(&rbd_dev->header_rwsem);
1721 if (rbd_dev->parent_overlap)
1722 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1723 up_read(&rbd_dev->header_rwsem);
1726 rbd_warn(rbd_dev, "parent reference overflow");
1732 * Caller is responsible for filling in the list of object requests
1733 * that comprises the image request, and the Linux request pointer
1734 * (if there is one).
1736 static struct rbd_img_request *rbd_img_request_create(
1737 struct rbd_device *rbd_dev,
1738 enum obj_operation_type op_type,
1739 struct ceph_snap_context *snapc)
1741 struct rbd_img_request *img_request;
1743 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1747 img_request->rbd_dev = rbd_dev;
1748 img_request->op_type = op_type;
1749 if (!rbd_img_is_write(img_request))
1750 img_request->snap_id = rbd_dev->spec->snap_id;
1752 img_request->snapc = snapc;
1754 if (rbd_dev_parent_get(rbd_dev))
1755 img_request_layered_set(img_request);
1757 INIT_LIST_HEAD(&img_request->lock_item);
1758 INIT_LIST_HEAD(&img_request->object_extents);
1759 mutex_init(&img_request->state_mutex);
1760 kref_init(&img_request->kref);
1765 static void rbd_img_request_destroy(struct kref *kref)
1767 struct rbd_img_request *img_request;
1768 struct rbd_obj_request *obj_request;
1769 struct rbd_obj_request *next_obj_request;
1771 img_request = container_of(kref, struct rbd_img_request, kref);
1773 dout("%s: img %p\n", __func__, img_request);
1775 WARN_ON(!list_empty(&img_request->lock_item));
1776 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1777 rbd_img_obj_request_del(img_request, obj_request);
1779 if (img_request_layered_test(img_request)) {
1780 img_request_layered_clear(img_request);
1781 rbd_dev_parent_put(img_request->rbd_dev);
1784 if (rbd_img_is_write(img_request))
1785 ceph_put_snap_context(img_request->snapc);
1787 kmem_cache_free(rbd_img_request_cache, img_request);
1790 #define BITS_PER_OBJ 2
1791 #define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ)
1792 #define OBJ_MASK ((1 << BITS_PER_OBJ) - 1)
1794 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1795 u64 *index, u8 *shift)
1799 rbd_assert(objno < rbd_dev->object_map_size);
1800 *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1801 *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1804 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1809 lockdep_assert_held(&rbd_dev->object_map_lock);
1810 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1811 return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1814 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1820 lockdep_assert_held(&rbd_dev->object_map_lock);
1821 rbd_assert(!(val & ~OBJ_MASK));
1823 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1824 p = &rbd_dev->object_map[index];
1825 *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1828 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1832 spin_lock(&rbd_dev->object_map_lock);
1833 state = __rbd_object_map_get(rbd_dev, objno);
1834 spin_unlock(&rbd_dev->object_map_lock);
1838 static bool use_object_map(struct rbd_device *rbd_dev)
1840 return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1841 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1844 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1848 /* fall back to default logic if object map is disabled or invalid */
1849 if (!use_object_map(rbd_dev))
1852 state = rbd_object_map_get(rbd_dev, objno);
1853 return state != OBJECT_NONEXISTENT;
1856 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1857 struct ceph_object_id *oid)
1859 if (snap_id == CEPH_NOSNAP)
1860 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1861 rbd_dev->spec->image_id);
1863 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1864 rbd_dev->spec->image_id, snap_id);
1867 static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1869 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1870 CEPH_DEFINE_OID_ONSTACK(oid);
1873 struct ceph_locker *lockers;
1875 bool broke_lock = false;
1878 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1881 ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1882 CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1883 if (ret != -EBUSY || broke_lock) {
1885 ret = 0; /* already locked by myself */
1887 rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1891 ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1892 RBD_LOCK_NAME, &lock_type, &lock_tag,
1893 &lockers, &num_lockers);
1898 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1903 if (num_lockers == 0)
1906 rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1907 ENTITY_NAME(lockers[0].id.name));
1909 ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1910 RBD_LOCK_NAME, lockers[0].id.cookie,
1911 &lockers[0].id.name);
1912 ceph_free_lockers(lockers, num_lockers);
1917 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1925 static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1927 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1928 CEPH_DEFINE_OID_ONSTACK(oid);
1931 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1933 ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1935 if (ret && ret != -ENOENT)
1936 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1939 static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1947 ceph_decode_32_safe(p, end, header_len, e_inval);
1948 header_end = *p + header_len;
1950 ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1955 ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1964 static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1966 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1967 CEPH_DEFINE_OID_ONSTACK(oid);
1968 struct page **pages;
1972 u64 object_map_bytes;
1973 u64 object_map_size;
1977 rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1979 num_objects = ceph_get_num_objects(&rbd_dev->layout,
1980 rbd_dev->mapping.size);
1981 object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1983 num_pages = calc_pages_for(0, object_map_bytes) + 1;
1984 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1986 return PTR_ERR(pages);
1988 reply_len = num_pages * PAGE_SIZE;
1989 rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1990 ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1991 "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1992 NULL, 0, pages, &reply_len);
1996 p = page_address(pages[0]);
1997 end = p + min(reply_len, (size_t)PAGE_SIZE);
1998 ret = decode_object_map_header(&p, end, &object_map_size);
2002 if (object_map_size != num_objects) {
2003 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
2004 object_map_size, num_objects);
2009 if (offset_in_page(p) + object_map_bytes > reply_len) {
2014 rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
2015 if (!rbd_dev->object_map) {
2020 rbd_dev->object_map_size = object_map_size;
2021 ceph_copy_from_page_vector(pages, rbd_dev->object_map,
2022 offset_in_page(p), object_map_bytes);
2025 ceph_release_page_vector(pages, num_pages);
2029 static void rbd_object_map_free(struct rbd_device *rbd_dev)
2031 kvfree(rbd_dev->object_map);
2032 rbd_dev->object_map = NULL;
2033 rbd_dev->object_map_size = 0;
2036 static int rbd_object_map_load(struct rbd_device *rbd_dev)
2040 ret = __rbd_object_map_load(rbd_dev);
2044 ret = rbd_dev_v2_get_flags(rbd_dev);
2046 rbd_object_map_free(rbd_dev);
2050 if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
2051 rbd_warn(rbd_dev, "object map is invalid");
2056 static int rbd_object_map_open(struct rbd_device *rbd_dev)
2060 ret = rbd_object_map_lock(rbd_dev);
2064 ret = rbd_object_map_load(rbd_dev);
2066 rbd_object_map_unlock(rbd_dev);
2073 static void rbd_object_map_close(struct rbd_device *rbd_dev)
2075 rbd_object_map_free(rbd_dev);
2076 rbd_object_map_unlock(rbd_dev);
2080 * This function needs snap_id (or more precisely just something to
2081 * distinguish between HEAD and snapshot object maps), new_state and
2082 * current_state that were passed to rbd_object_map_update().
2084 * To avoid allocating and stashing a context we piggyback on the OSD
2085 * request. A HEAD update has two ops (assert_locked). For new_state
2086 * and current_state we decode our own object_map_update op, encoded in
2087 * rbd_cls_object_map_update().
2089 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2090 struct ceph_osd_request *osd_req)
2092 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2093 struct ceph_osd_data *osd_data;
2095 u8 state, new_state, uninitialized_var(current_state);
2096 bool has_current_state;
2099 if (osd_req->r_result)
2100 return osd_req->r_result;
2103 * Nothing to do for a snapshot object map.
2105 if (osd_req->r_num_ops == 1)
2109 * Update in-memory HEAD object map.
2111 rbd_assert(osd_req->r_num_ops == 2);
2112 osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2113 rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2115 p = page_address(osd_data->pages[0]);
2116 objno = ceph_decode_64(&p);
2117 rbd_assert(objno == obj_req->ex.oe_objno);
2118 rbd_assert(ceph_decode_64(&p) == objno + 1);
2119 new_state = ceph_decode_8(&p);
2120 has_current_state = ceph_decode_8(&p);
2121 if (has_current_state)
2122 current_state = ceph_decode_8(&p);
2124 spin_lock(&rbd_dev->object_map_lock);
2125 state = __rbd_object_map_get(rbd_dev, objno);
2126 if (!has_current_state || current_state == state ||
2127 (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2128 __rbd_object_map_set(rbd_dev, objno, new_state);
2129 spin_unlock(&rbd_dev->object_map_lock);
2134 static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2136 struct rbd_obj_request *obj_req = osd_req->r_priv;
2139 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2140 osd_req->r_result, obj_req);
2142 result = rbd_object_map_update_finish(obj_req, osd_req);
2143 rbd_obj_handle_request(obj_req, result);
2146 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2148 u8 state = rbd_object_map_get(rbd_dev, objno);
2150 if (state == new_state ||
2151 (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2152 (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2158 static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2159 int which, u64 objno, u8 new_state,
2160 const u8 *current_state)
2162 struct page **pages;
2166 ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2170 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2172 return PTR_ERR(pages);
2174 p = start = page_address(pages[0]);
2175 ceph_encode_64(&p, objno);
2176 ceph_encode_64(&p, objno + 1);
2177 ceph_encode_8(&p, new_state);
2178 if (current_state) {
2179 ceph_encode_8(&p, 1);
2180 ceph_encode_8(&p, *current_state);
2182 ceph_encode_8(&p, 0);
2185 osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2192 * 0 - object map update sent
2193 * 1 - object map update isn't needed
2196 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2197 u8 new_state, const u8 *current_state)
2199 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2200 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2201 struct ceph_osd_request *req;
2206 if (snap_id == CEPH_NOSNAP) {
2207 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2210 num_ops++; /* assert_locked */
2213 req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2217 list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2218 req->r_callback = rbd_object_map_callback;
2219 req->r_priv = obj_req;
2221 rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2222 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2223 req->r_flags = CEPH_OSD_FLAG_WRITE;
2224 ktime_get_real_ts64(&req->r_mtime);
2226 if (snap_id == CEPH_NOSNAP) {
2228 * Protect against possible race conditions during lock
2229 * ownership transitions.
2231 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2232 CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2237 ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2238 new_state, current_state);
2242 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2246 ceph_osdc_start_request(osdc, req, false);
2250 static void prune_extents(struct ceph_file_extent *img_extents,
2251 u32 *num_img_extents, u64 overlap)
2253 u32 cnt = *num_img_extents;
2255 /* drop extents completely beyond the overlap */
2256 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2260 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2262 /* trim final overlapping extent */
2263 if (ex->fe_off + ex->fe_len > overlap)
2264 ex->fe_len = overlap - ex->fe_off;
2267 *num_img_extents = cnt;
2271 * Determine the byte range(s) covered by either just the object extent
2272 * or the entire object in the parent image.
2274 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2277 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2280 if (!rbd_dev->parent_overlap)
2283 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2284 entire ? 0 : obj_req->ex.oe_off,
2285 entire ? rbd_dev->layout.object_size :
2287 &obj_req->img_extents,
2288 &obj_req->num_img_extents);
2292 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2293 rbd_dev->parent_overlap);
2297 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2299 struct rbd_obj_request *obj_req = osd_req->r_priv;
2301 switch (obj_req->img_request->data_type) {
2302 case OBJ_REQUEST_BIO:
2303 osd_req_op_extent_osd_data_bio(osd_req, which,
2305 obj_req->ex.oe_len);
2307 case OBJ_REQUEST_BVECS:
2308 case OBJ_REQUEST_OWN_BVECS:
2309 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2310 obj_req->ex.oe_len);
2311 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2312 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2313 &obj_req->bvec_pos);
2320 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2322 struct page **pages;
2325 * The response data for a STAT call consists of:
2332 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2334 return PTR_ERR(pages);
2336 osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2337 osd_req_op_raw_data_in_pages(osd_req, which, pages,
2338 8 + sizeof(struct ceph_timespec),
2343 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2346 struct rbd_obj_request *obj_req = osd_req->r_priv;
2349 ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2353 osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2354 obj_req->copyup_bvec_count, bytes);
2358 static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2360 obj_req->read_state = RBD_OBJ_READ_START;
2364 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2367 struct rbd_obj_request *obj_req = osd_req->r_priv;
2368 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2371 if (!use_object_map(rbd_dev) ||
2372 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2373 osd_req_op_alloc_hint_init(osd_req, which++,
2374 rbd_dev->layout.object_size,
2375 rbd_dev->layout.object_size);
2378 if (rbd_obj_is_entire(obj_req))
2379 opcode = CEPH_OSD_OP_WRITEFULL;
2381 opcode = CEPH_OSD_OP_WRITE;
2383 osd_req_op_extent_init(osd_req, which, opcode,
2384 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2385 rbd_osd_setup_data(osd_req, which);
2388 static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2392 /* reverse map the entire object onto the parent */
2393 ret = rbd_obj_calc_img_extents(obj_req, true);
2397 if (rbd_obj_copyup_enabled(obj_req))
2398 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2400 obj_req->write_state = RBD_OBJ_WRITE_START;
2404 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2406 return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2410 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2413 struct rbd_obj_request *obj_req = osd_req->r_priv;
2415 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2416 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2417 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2419 osd_req_op_extent_init(osd_req, which,
2420 truncate_or_zero_opcode(obj_req),
2421 obj_req->ex.oe_off, obj_req->ex.oe_len,
2426 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2428 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2433 * Align the range to alloc_size boundary and punt on discards
2434 * that are too small to free up any space.
2436 * alloc_size == object_size && is_tail() is a special case for
2437 * filestore with filestore_punch_hole = false, needed to allow
2438 * truncate (in addition to delete).
2440 if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2441 !rbd_obj_is_tail(obj_req)) {
2442 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2443 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2444 rbd_dev->opts->alloc_size);
2445 if (off >= next_off)
2448 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2449 obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2450 off, next_off - off);
2451 obj_req->ex.oe_off = off;
2452 obj_req->ex.oe_len = next_off - off;
2455 /* reverse map the entire object onto the parent */
2456 ret = rbd_obj_calc_img_extents(obj_req, true);
2460 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2461 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2462 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2464 obj_req->write_state = RBD_OBJ_WRITE_START;
2468 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2471 struct rbd_obj_request *obj_req = osd_req->r_priv;
2474 if (rbd_obj_is_entire(obj_req)) {
2475 if (obj_req->num_img_extents) {
2476 if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2477 osd_req_op_init(osd_req, which++,
2478 CEPH_OSD_OP_CREATE, 0);
2479 opcode = CEPH_OSD_OP_TRUNCATE;
2481 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2482 osd_req_op_init(osd_req, which++,
2483 CEPH_OSD_OP_DELETE, 0);
2487 opcode = truncate_or_zero_opcode(obj_req);
2491 osd_req_op_extent_init(osd_req, which, opcode,
2492 obj_req->ex.oe_off, obj_req->ex.oe_len,
2496 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2500 /* reverse map the entire object onto the parent */
2501 ret = rbd_obj_calc_img_extents(obj_req, true);
2505 if (rbd_obj_copyup_enabled(obj_req))
2506 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2507 if (!obj_req->num_img_extents) {
2508 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2509 if (rbd_obj_is_entire(obj_req))
2510 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2513 obj_req->write_state = RBD_OBJ_WRITE_START;
2517 static int count_write_ops(struct rbd_obj_request *obj_req)
2519 struct rbd_img_request *img_req = obj_req->img_request;
2521 switch (img_req->op_type) {
2523 if (!use_object_map(img_req->rbd_dev) ||
2524 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2525 return 2; /* setallochint + write/writefull */
2527 return 1; /* write/writefull */
2528 case OBJ_OP_DISCARD:
2529 return 1; /* delete/truncate/zero */
2530 case OBJ_OP_ZEROOUT:
2531 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2532 !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2533 return 2; /* create + truncate */
2535 return 1; /* delete/truncate/zero */
2541 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2544 struct rbd_obj_request *obj_req = osd_req->r_priv;
2546 switch (obj_req->img_request->op_type) {
2548 __rbd_osd_setup_write_ops(osd_req, which);
2550 case OBJ_OP_DISCARD:
2551 __rbd_osd_setup_discard_ops(osd_req, which);
2553 case OBJ_OP_ZEROOUT:
2554 __rbd_osd_setup_zeroout_ops(osd_req, which);
2562 * Prune the list of object requests (adjust offset and/or length, drop
2563 * redundant requests). Prepare object request state machines and image
2564 * request state machine for execution.
2566 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2568 struct rbd_obj_request *obj_req, *next_obj_req;
2571 for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2572 switch (img_req->op_type) {
2574 ret = rbd_obj_init_read(obj_req);
2577 ret = rbd_obj_init_write(obj_req);
2579 case OBJ_OP_DISCARD:
2580 ret = rbd_obj_init_discard(obj_req);
2582 case OBJ_OP_ZEROOUT:
2583 ret = rbd_obj_init_zeroout(obj_req);
2591 rbd_img_obj_request_del(img_req, obj_req);
2596 img_req->state = RBD_IMG_START;
2600 union rbd_img_fill_iter {
2601 struct ceph_bio_iter bio_iter;
2602 struct ceph_bvec_iter bvec_iter;
2605 struct rbd_img_fill_ctx {
2606 enum obj_request_type pos_type;
2607 union rbd_img_fill_iter *pos;
2608 union rbd_img_fill_iter iter;
2609 ceph_object_extent_fn_t set_pos_fn;
2610 ceph_object_extent_fn_t count_fn;
2611 ceph_object_extent_fn_t copy_fn;
2614 static struct ceph_object_extent *alloc_object_extent(void *arg)
2616 struct rbd_img_request *img_req = arg;
2617 struct rbd_obj_request *obj_req;
2619 obj_req = rbd_obj_request_create();
2623 rbd_img_obj_request_add(img_req, obj_req);
2624 return &obj_req->ex;
2628 * While su != os && sc == 1 is technically not fancy (it's the same
2629 * layout as su == os && sc == 1), we can't use the nocopy path for it
2630 * because ->set_pos_fn() should be called only once per object.
2631 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2632 * treat su != os && sc == 1 as fancy.
2634 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2636 return l->stripe_unit != l->object_size;
2639 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2640 struct ceph_file_extent *img_extents,
2641 u32 num_img_extents,
2642 struct rbd_img_fill_ctx *fctx)
2647 img_req->data_type = fctx->pos_type;
2650 * Create object requests and set each object request's starting
2651 * position in the provided bio (list) or bio_vec array.
2653 fctx->iter = *fctx->pos;
2654 for (i = 0; i < num_img_extents; i++) {
2655 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2656 img_extents[i].fe_off,
2657 img_extents[i].fe_len,
2658 &img_req->object_extents,
2659 alloc_object_extent, img_req,
2660 fctx->set_pos_fn, &fctx->iter);
2665 return __rbd_img_fill_request(img_req);
2669 * Map a list of image extents to a list of object extents, create the
2670 * corresponding object requests (normally each to a different object,
2671 * but not always) and add them to @img_req. For each object request,
2672 * set up its data descriptor to point to the corresponding chunk(s) of
2673 * @fctx->pos data buffer.
2675 * Because ceph_file_to_extents() will merge adjacent object extents
2676 * together, each object request's data descriptor may point to multiple
2677 * different chunks of @fctx->pos data buffer.
2679 * @fctx->pos data buffer is assumed to be large enough.
2681 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2682 struct ceph_file_extent *img_extents,
2683 u32 num_img_extents,
2684 struct rbd_img_fill_ctx *fctx)
2686 struct rbd_device *rbd_dev = img_req->rbd_dev;
2687 struct rbd_obj_request *obj_req;
2691 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2692 !rbd_layout_is_fancy(&rbd_dev->layout))
2693 return rbd_img_fill_request_nocopy(img_req, img_extents,
2694 num_img_extents, fctx);
2696 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2699 * Create object requests and determine ->bvec_count for each object
2700 * request. Note that ->bvec_count sum over all object requests may
2701 * be greater than the number of bio_vecs in the provided bio (list)
2702 * or bio_vec array because when mapped, those bio_vecs can straddle
2703 * stripe unit boundaries.
2705 fctx->iter = *fctx->pos;
2706 for (i = 0; i < num_img_extents; i++) {
2707 ret = ceph_file_to_extents(&rbd_dev->layout,
2708 img_extents[i].fe_off,
2709 img_extents[i].fe_len,
2710 &img_req->object_extents,
2711 alloc_object_extent, img_req,
2712 fctx->count_fn, &fctx->iter);
2717 for_each_obj_request(img_req, obj_req) {
2718 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2719 sizeof(*obj_req->bvec_pos.bvecs),
2721 if (!obj_req->bvec_pos.bvecs)
2726 * Fill in each object request's private bio_vec array, splitting and
2727 * rearranging the provided bio_vecs in stripe unit chunks as needed.
2729 fctx->iter = *fctx->pos;
2730 for (i = 0; i < num_img_extents; i++) {
2731 ret = ceph_iterate_extents(&rbd_dev->layout,
2732 img_extents[i].fe_off,
2733 img_extents[i].fe_len,
2734 &img_req->object_extents,
2735 fctx->copy_fn, &fctx->iter);
2740 return __rbd_img_fill_request(img_req);
2743 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2746 struct ceph_file_extent ex = { off, len };
2747 union rbd_img_fill_iter dummy;
2748 struct rbd_img_fill_ctx fctx = {
2749 .pos_type = OBJ_REQUEST_NODATA,
2753 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2756 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2758 struct rbd_obj_request *obj_req =
2759 container_of(ex, struct rbd_obj_request, ex);
2760 struct ceph_bio_iter *it = arg;
2762 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2763 obj_req->bio_pos = *it;
2764 ceph_bio_iter_advance(it, bytes);
2767 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2769 struct rbd_obj_request *obj_req =
2770 container_of(ex, struct rbd_obj_request, ex);
2771 struct ceph_bio_iter *it = arg;
2773 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2774 ceph_bio_iter_advance_step(it, bytes, ({
2775 obj_req->bvec_count++;
2780 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2782 struct rbd_obj_request *obj_req =
2783 container_of(ex, struct rbd_obj_request, ex);
2784 struct ceph_bio_iter *it = arg;
2786 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2787 ceph_bio_iter_advance_step(it, bytes, ({
2788 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2789 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2793 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2794 struct ceph_file_extent *img_extents,
2795 u32 num_img_extents,
2796 struct ceph_bio_iter *bio_pos)
2798 struct rbd_img_fill_ctx fctx = {
2799 .pos_type = OBJ_REQUEST_BIO,
2800 .pos = (union rbd_img_fill_iter *)bio_pos,
2801 .set_pos_fn = set_bio_pos,
2802 .count_fn = count_bio_bvecs,
2803 .copy_fn = copy_bio_bvecs,
2806 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2810 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2811 u64 off, u64 len, struct bio *bio)
2813 struct ceph_file_extent ex = { off, len };
2814 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2816 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2819 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2821 struct rbd_obj_request *obj_req =
2822 container_of(ex, struct rbd_obj_request, ex);
2823 struct ceph_bvec_iter *it = arg;
2825 obj_req->bvec_pos = *it;
2826 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2827 ceph_bvec_iter_advance(it, bytes);
2830 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2832 struct rbd_obj_request *obj_req =
2833 container_of(ex, struct rbd_obj_request, ex);
2834 struct ceph_bvec_iter *it = arg;
2836 ceph_bvec_iter_advance_step(it, bytes, ({
2837 obj_req->bvec_count++;
2841 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2843 struct rbd_obj_request *obj_req =
2844 container_of(ex, struct rbd_obj_request, ex);
2845 struct ceph_bvec_iter *it = arg;
2847 ceph_bvec_iter_advance_step(it, bytes, ({
2848 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2849 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2853 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2854 struct ceph_file_extent *img_extents,
2855 u32 num_img_extents,
2856 struct ceph_bvec_iter *bvec_pos)
2858 struct rbd_img_fill_ctx fctx = {
2859 .pos_type = OBJ_REQUEST_BVECS,
2860 .pos = (union rbd_img_fill_iter *)bvec_pos,
2861 .set_pos_fn = set_bvec_pos,
2862 .count_fn = count_bvecs,
2863 .copy_fn = copy_bvecs,
2866 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2870 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2871 struct ceph_file_extent *img_extents,
2872 u32 num_img_extents,
2873 struct bio_vec *bvecs)
2875 struct ceph_bvec_iter it = {
2877 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2881 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2885 static void rbd_img_handle_request_work(struct work_struct *work)
2887 struct rbd_img_request *img_req =
2888 container_of(work, struct rbd_img_request, work);
2890 rbd_img_handle_request(img_req, img_req->work_result);
2893 static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2895 INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2896 img_req->work_result = result;
2897 queue_work(rbd_wq, &img_req->work);
2900 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2902 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2904 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2905 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2909 dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2910 obj_req->ex.oe_objno);
2914 static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2916 struct ceph_osd_request *osd_req;
2919 osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2920 if (IS_ERR(osd_req))
2921 return PTR_ERR(osd_req);
2923 osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2924 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2925 rbd_osd_setup_data(osd_req, 0);
2926 rbd_osd_format_read(osd_req);
2928 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2932 rbd_osd_submit(osd_req);
2936 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2938 struct rbd_img_request *img_req = obj_req->img_request;
2939 struct rbd_img_request *child_img_req;
2942 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2947 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2948 child_img_req->obj_request = obj_req;
2950 dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2953 if (!rbd_img_is_write(img_req)) {
2954 switch (img_req->data_type) {
2955 case OBJ_REQUEST_BIO:
2956 ret = __rbd_img_fill_from_bio(child_img_req,
2957 obj_req->img_extents,
2958 obj_req->num_img_extents,
2961 case OBJ_REQUEST_BVECS:
2962 case OBJ_REQUEST_OWN_BVECS:
2963 ret = __rbd_img_fill_from_bvecs(child_img_req,
2964 obj_req->img_extents,
2965 obj_req->num_img_extents,
2966 &obj_req->bvec_pos);
2972 ret = rbd_img_fill_from_bvecs(child_img_req,
2973 obj_req->img_extents,
2974 obj_req->num_img_extents,
2975 obj_req->copyup_bvecs);
2978 rbd_img_request_put(child_img_req);
2982 /* avoid parent chain recursion */
2983 rbd_img_schedule(child_img_req, 0);
2987 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2989 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2993 switch (obj_req->read_state) {
2994 case RBD_OBJ_READ_START:
2995 rbd_assert(!*result);
2997 if (!rbd_obj_may_exist(obj_req)) {
2999 obj_req->read_state = RBD_OBJ_READ_OBJECT;
3003 ret = rbd_obj_read_object(obj_req);
3008 obj_req->read_state = RBD_OBJ_READ_OBJECT;
3010 case RBD_OBJ_READ_OBJECT:
3011 if (*result == -ENOENT && rbd_dev->parent_overlap) {
3012 /* reverse map this object extent onto the parent */
3013 ret = rbd_obj_calc_img_extents(obj_req, false);
3018 if (obj_req->num_img_extents) {
3019 ret = rbd_obj_read_from_parent(obj_req);
3024 obj_req->read_state = RBD_OBJ_READ_PARENT;
3030 * -ENOENT means a hole in the image -- zero-fill the entire
3031 * length of the request. A short read also implies zero-fill
3032 * to the end of the request.
3034 if (*result == -ENOENT) {
3035 rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
3037 } else if (*result >= 0) {
3038 if (*result < obj_req->ex.oe_len)
3039 rbd_obj_zero_range(obj_req, *result,
3040 obj_req->ex.oe_len - *result);
3042 rbd_assert(*result == obj_req->ex.oe_len);
3046 case RBD_OBJ_READ_PARENT:
3048 * The parent image is read only up to the overlap -- zero-fill
3049 * from the overlap to the end of the request.
3052 u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
3054 if (obj_overlap < obj_req->ex.oe_len)
3055 rbd_obj_zero_range(obj_req, obj_overlap,
3056 obj_req->ex.oe_len - obj_overlap);
3064 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
3066 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3068 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
3069 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
3071 if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
3072 (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
3073 dout("%s %p noop for nonexistent\n", __func__, obj_req);
3082 * 0 - object map update sent
3083 * 1 - object map update isn't needed
3086 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3088 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3091 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3094 if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3095 new_state = OBJECT_PENDING;
3097 new_state = OBJECT_EXISTS;
3099 return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3102 static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3104 struct ceph_osd_request *osd_req;
3105 int num_ops = count_write_ops(obj_req);
3109 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3110 num_ops++; /* stat */
3112 osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3113 if (IS_ERR(osd_req))
3114 return PTR_ERR(osd_req);
3116 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3117 ret = rbd_osd_setup_stat(osd_req, which++);
3122 rbd_osd_setup_write_ops(osd_req, which);
3123 rbd_osd_format_write(osd_req);
3125 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3129 rbd_osd_submit(osd_req);
3134 * copyup_bvecs pages are never highmem pages
3136 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3138 struct ceph_bvec_iter it = {
3140 .iter = { .bi_size = bytes },
3143 ceph_bvec_iter_advance_step(&it, bytes, ({
3144 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3151 #define MODS_ONLY U32_MAX
3153 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3156 struct ceph_osd_request *osd_req;
3159 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3160 rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3162 osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3163 if (IS_ERR(osd_req))
3164 return PTR_ERR(osd_req);
3166 ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3170 rbd_osd_format_write(osd_req);
3172 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3176 rbd_osd_submit(osd_req);
3180 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3183 struct ceph_osd_request *osd_req;
3184 int num_ops = count_write_ops(obj_req);
3188 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3190 if (bytes != MODS_ONLY)
3191 num_ops++; /* copyup */
3193 osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3194 if (IS_ERR(osd_req))
3195 return PTR_ERR(osd_req);
3197 if (bytes != MODS_ONLY) {
3198 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3203 rbd_osd_setup_write_ops(osd_req, which);
3204 rbd_osd_format_write(osd_req);
3206 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3210 rbd_osd_submit(osd_req);
3214 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3218 rbd_assert(!obj_req->copyup_bvecs);
3219 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3220 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3221 sizeof(*obj_req->copyup_bvecs),
3223 if (!obj_req->copyup_bvecs)
3226 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3227 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3229 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3230 if (!obj_req->copyup_bvecs[i].bv_page)
3233 obj_req->copyup_bvecs[i].bv_offset = 0;
3234 obj_req->copyup_bvecs[i].bv_len = len;
3238 rbd_assert(!obj_overlap);
3243 * The target object doesn't exist. Read the data for the entire
3244 * target object up to the overlap point (if any) from the parent,
3245 * so we can use it for a copyup.
3247 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3249 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3252 rbd_assert(obj_req->num_img_extents);
3253 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3254 rbd_dev->parent_overlap);
3255 if (!obj_req->num_img_extents) {
3257 * The overlap has become 0 (most likely because the
3258 * image has been flattened). Re-submit the original write
3259 * request -- pass MODS_ONLY since the copyup isn't needed
3262 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3265 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3269 return rbd_obj_read_from_parent(obj_req);
3272 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3274 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3275 struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3280 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3282 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3285 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3288 for (i = 0; i < snapc->num_snaps; i++) {
3289 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3290 i + 1 < snapc->num_snaps)
3291 new_state = OBJECT_EXISTS_CLEAN;
3293 new_state = OBJECT_EXISTS;
3295 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3298 obj_req->pending.result = ret;
3303 obj_req->pending.num_pending++;
3307 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3309 u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3312 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3315 * Only send non-zero copyup data to save some I/O and network
3316 * bandwidth -- zero copyup data is equivalent to the object not
3319 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3322 if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3324 * Send a copyup request with an empty snapshot context to
3325 * deep-copyup the object through all existing snapshots.
3326 * A second request with the current snapshot context will be
3327 * sent for the actual modification.
3329 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3331 obj_req->pending.result = ret;
3335 obj_req->pending.num_pending++;
3339 ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3341 obj_req->pending.result = ret;
3345 obj_req->pending.num_pending++;
3348 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3350 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3354 switch (obj_req->copyup_state) {
3355 case RBD_OBJ_COPYUP_START:
3356 rbd_assert(!*result);
3358 ret = rbd_obj_copyup_read_parent(obj_req);
3363 if (obj_req->num_img_extents)
3364 obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3366 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3368 case RBD_OBJ_COPYUP_READ_PARENT:
3372 if (is_zero_bvecs(obj_req->copyup_bvecs,
3373 rbd_obj_img_extents_bytes(obj_req))) {
3374 dout("%s %p detected zeros\n", __func__, obj_req);
3375 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3378 rbd_obj_copyup_object_maps(obj_req);
3379 if (!obj_req->pending.num_pending) {
3380 *result = obj_req->pending.result;
3381 obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3384 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3386 case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3387 if (!pending_result_dec(&obj_req->pending, result))
3390 case RBD_OBJ_COPYUP_OBJECT_MAPS:
3392 rbd_warn(rbd_dev, "snap object map update failed: %d",
3397 rbd_obj_copyup_write_object(obj_req);
3398 if (!obj_req->pending.num_pending) {
3399 *result = obj_req->pending.result;
3400 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3403 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3405 case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3406 if (!pending_result_dec(&obj_req->pending, result))
3409 case RBD_OBJ_COPYUP_WRITE_OBJECT:
3418 * 0 - object map update sent
3419 * 1 - object map update isn't needed
3422 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3424 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3425 u8 current_state = OBJECT_PENDING;
3427 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3430 if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3433 return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3437 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3439 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3443 switch (obj_req->write_state) {
3444 case RBD_OBJ_WRITE_START:
3445 rbd_assert(!*result);
3447 if (rbd_obj_write_is_noop(obj_req))
3450 ret = rbd_obj_write_pre_object_map(obj_req);
3455 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3459 case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3461 rbd_warn(rbd_dev, "pre object map update failed: %d",
3465 ret = rbd_obj_write_object(obj_req);
3470 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3472 case RBD_OBJ_WRITE_OBJECT:
3473 if (*result == -ENOENT) {
3474 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3476 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3477 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3481 * On a non-existent object:
3482 * delete - -ENOENT, truncate/zero - 0
3484 if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3490 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3492 case __RBD_OBJ_WRITE_COPYUP:
3493 if (!rbd_obj_advance_copyup(obj_req, result))
3496 case RBD_OBJ_WRITE_COPYUP:
3498 rbd_warn(rbd_dev, "copyup failed: %d", *result);
3501 ret = rbd_obj_write_post_object_map(obj_req);
3506 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3510 case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3512 rbd_warn(rbd_dev, "post object map update failed: %d",
3521 * Return true if @obj_req is completed.
3523 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3526 struct rbd_img_request *img_req = obj_req->img_request;
3527 struct rbd_device *rbd_dev = img_req->rbd_dev;
3530 mutex_lock(&obj_req->state_mutex);
3531 if (!rbd_img_is_write(img_req))
3532 done = rbd_obj_advance_read(obj_req, result);
3534 done = rbd_obj_advance_write(obj_req, result);
3535 mutex_unlock(&obj_req->state_mutex);
3537 if (done && *result) {
3538 rbd_assert(*result < 0);
3539 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3540 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3541 obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3547 * This is open-coded in rbd_img_handle_request() to avoid parent chain
3550 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3552 if (__rbd_obj_handle_request(obj_req, &result))
3553 rbd_img_handle_request(obj_req->img_request, result);
3556 static bool need_exclusive_lock(struct rbd_img_request *img_req)
3558 struct rbd_device *rbd_dev = img_req->rbd_dev;
3560 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3563 if (rbd_is_snap(rbd_dev))
3566 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3567 if (rbd_dev->opts->lock_on_read ||
3568 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3571 return rbd_img_is_write(img_req);
3574 static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3576 struct rbd_device *rbd_dev = img_req->rbd_dev;
3579 lockdep_assert_held(&rbd_dev->lock_rwsem);
3580 locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3581 spin_lock(&rbd_dev->lock_lists_lock);
3582 rbd_assert(list_empty(&img_req->lock_item));
3584 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3586 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3587 spin_unlock(&rbd_dev->lock_lists_lock);
3591 static void rbd_lock_del_request(struct rbd_img_request *img_req)
3593 struct rbd_device *rbd_dev = img_req->rbd_dev;
3596 lockdep_assert_held(&rbd_dev->lock_rwsem);
3597 spin_lock(&rbd_dev->lock_lists_lock);
3598 rbd_assert(!list_empty(&img_req->lock_item));
3599 list_del_init(&img_req->lock_item);
3600 need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3601 list_empty(&rbd_dev->running_list));
3602 spin_unlock(&rbd_dev->lock_lists_lock);
3604 complete(&rbd_dev->releasing_wait);
3607 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3609 struct rbd_device *rbd_dev = img_req->rbd_dev;
3611 if (!need_exclusive_lock(img_req))
3614 if (rbd_lock_add_request(img_req))
3617 if (rbd_dev->opts->exclusive) {
3618 WARN_ON(1); /* lock got released? */
3623 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3624 * and cancel_delayed_work() in wake_lock_waiters().
3626 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3627 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3631 static void rbd_img_object_requests(struct rbd_img_request *img_req)
3633 struct rbd_obj_request *obj_req;
3635 rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3637 for_each_obj_request(img_req, obj_req) {
3640 if (__rbd_obj_handle_request(obj_req, &result)) {
3642 img_req->pending.result = result;
3646 img_req->pending.num_pending++;
3651 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3653 struct rbd_device *rbd_dev = img_req->rbd_dev;
3657 switch (img_req->state) {
3659 rbd_assert(!*result);
3661 ret = rbd_img_exclusive_lock(img_req);
3666 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3670 case RBD_IMG_EXCLUSIVE_LOCK:
3674 rbd_assert(!need_exclusive_lock(img_req) ||
3675 __rbd_is_lock_owner(rbd_dev));
3677 rbd_img_object_requests(img_req);
3678 if (!img_req->pending.num_pending) {
3679 *result = img_req->pending.result;
3680 img_req->state = RBD_IMG_OBJECT_REQUESTS;
3683 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3685 case __RBD_IMG_OBJECT_REQUESTS:
3686 if (!pending_result_dec(&img_req->pending, result))
3689 case RBD_IMG_OBJECT_REQUESTS:
3697 * Return true if @img_req is completed.
3699 static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3702 struct rbd_device *rbd_dev = img_req->rbd_dev;
3705 if (need_exclusive_lock(img_req)) {
3706 down_read(&rbd_dev->lock_rwsem);
3707 mutex_lock(&img_req->state_mutex);
3708 done = rbd_img_advance(img_req, result);
3710 rbd_lock_del_request(img_req);
3711 mutex_unlock(&img_req->state_mutex);
3712 up_read(&rbd_dev->lock_rwsem);
3714 mutex_lock(&img_req->state_mutex);
3715 done = rbd_img_advance(img_req, result);
3716 mutex_unlock(&img_req->state_mutex);
3719 if (done && *result) {
3720 rbd_assert(*result < 0);
3721 rbd_warn(rbd_dev, "%s%s result %d",
3722 test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3723 obj_op_name(img_req->op_type), *result);
3728 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3731 if (!__rbd_img_handle_request(img_req, &result))
3734 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3735 struct rbd_obj_request *obj_req = img_req->obj_request;
3737 rbd_img_request_put(img_req);
3738 if (__rbd_obj_handle_request(obj_req, &result)) {
3739 img_req = obj_req->img_request;
3743 struct request *rq = img_req->rq;
3745 rbd_img_request_put(img_req);
3746 blk_mq_end_request(rq, errno_to_blk_status(result));
3750 static const struct rbd_client_id rbd_empty_cid;
3752 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3753 const struct rbd_client_id *rhs)
3755 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3758 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3760 struct rbd_client_id cid;
3762 mutex_lock(&rbd_dev->watch_mutex);
3763 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3764 cid.handle = rbd_dev->watch_cookie;
3765 mutex_unlock(&rbd_dev->watch_mutex);
3770 * lock_rwsem must be held for write
3772 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3773 const struct rbd_client_id *cid)
3775 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3776 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3777 cid->gid, cid->handle);
3778 rbd_dev->owner_cid = *cid; /* struct */
3781 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3783 mutex_lock(&rbd_dev->watch_mutex);
3784 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3785 mutex_unlock(&rbd_dev->watch_mutex);
3788 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3790 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3792 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3793 strcpy(rbd_dev->lock_cookie, cookie);
3794 rbd_set_owner_cid(rbd_dev, &cid);
3795 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3799 * lock_rwsem must be held for write
3801 static int rbd_lock(struct rbd_device *rbd_dev)
3803 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3807 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3808 rbd_dev->lock_cookie[0] != '\0');
3810 format_lock_cookie(rbd_dev, cookie);
3811 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3812 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3813 RBD_LOCK_TAG, "", 0);
3817 __rbd_lock(rbd_dev, cookie);
3822 * lock_rwsem must be held for write
3824 static void rbd_unlock(struct rbd_device *rbd_dev)
3826 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3829 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3830 rbd_dev->lock_cookie[0] == '\0');
3832 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3833 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3834 if (ret && ret != -ENOENT)
3835 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3837 /* treat errors as the image is unlocked */
3838 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3839 rbd_dev->lock_cookie[0] = '\0';
3840 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3841 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3844 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3845 enum rbd_notify_op notify_op,
3846 struct page ***preply_pages,
3849 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3850 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3851 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3852 int buf_size = sizeof(buf);
3855 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3857 /* encode *LockPayload NotifyMessage (op + ClientId) */
3858 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3859 ceph_encode_32(&p, notify_op);
3860 ceph_encode_64(&p, cid.gid);
3861 ceph_encode_64(&p, cid.handle);
3863 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3864 &rbd_dev->header_oloc, buf, buf_size,
3865 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3868 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3869 enum rbd_notify_op notify_op)
3871 struct page **reply_pages;
3874 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3875 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3878 static void rbd_notify_acquired_lock(struct work_struct *work)
3880 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3881 acquired_lock_work);
3883 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3886 static void rbd_notify_released_lock(struct work_struct *work)
3888 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3889 released_lock_work);
3891 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3894 static int rbd_request_lock(struct rbd_device *rbd_dev)
3896 struct page **reply_pages;
3898 bool lock_owner_responded = false;
3901 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3903 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3904 &reply_pages, &reply_len);
3905 if (ret && ret != -ETIMEDOUT) {
3906 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3910 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3911 void *p = page_address(reply_pages[0]);
3912 void *const end = p + reply_len;
3915 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3920 ceph_decode_need(&p, end, 8 + 8, e_inval);
3921 p += 8 + 8; /* skip gid and cookie */
3923 ceph_decode_32_safe(&p, end, len, e_inval);
3927 if (lock_owner_responded) {
3929 "duplicate lock owners detected");
3934 lock_owner_responded = true;
3935 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3939 "failed to decode ResponseMessage: %d",
3944 ret = ceph_decode_32(&p);
3948 if (!lock_owner_responded) {
3949 rbd_warn(rbd_dev, "no lock owners detected");
3954 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3963 * Either image request state machine(s) or rbd_add_acquire_lock()
3966 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3968 struct rbd_img_request *img_req;
3970 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3971 lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3973 cancel_delayed_work(&rbd_dev->lock_dwork);
3974 if (!completion_done(&rbd_dev->acquire_wait)) {
3975 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3976 list_empty(&rbd_dev->running_list));
3977 rbd_dev->acquire_err = result;
3978 complete_all(&rbd_dev->acquire_wait);
3982 list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3983 mutex_lock(&img_req->state_mutex);
3984 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3985 rbd_img_schedule(img_req, result);
3986 mutex_unlock(&img_req->state_mutex);
3989 list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3992 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3993 struct ceph_locker **lockers, u32 *num_lockers)
3995 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4000 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4002 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
4003 &rbd_dev->header_oloc, RBD_LOCK_NAME,
4004 &lock_type, &lock_tag, lockers, num_lockers);
4008 if (*num_lockers == 0) {
4009 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
4013 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
4014 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
4020 if (lock_type == CEPH_CLS_LOCK_SHARED) {
4021 rbd_warn(rbd_dev, "shared lock type detected");
4026 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
4027 strlen(RBD_LOCK_COOKIE_PREFIX))) {
4028 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
4029 (*lockers)[0].id.cookie);
4039 static int find_watcher(struct rbd_device *rbd_dev,
4040 const struct ceph_locker *locker)
4042 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4043 struct ceph_watch_item *watchers;
4049 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
4050 &rbd_dev->header_oloc, &watchers,
4055 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
4056 for (i = 0; i < num_watchers; i++) {
4057 if (!memcmp(&watchers[i].addr, &locker->info.addr,
4058 sizeof(locker->info.addr)) &&
4059 watchers[i].cookie == cookie) {
4060 struct rbd_client_id cid = {
4061 .gid = le64_to_cpu(watchers[i].name.num),
4065 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
4066 rbd_dev, cid.gid, cid.handle);
4067 rbd_set_owner_cid(rbd_dev, &cid);
4073 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
4081 * lock_rwsem must be held for write
4083 static int rbd_try_lock(struct rbd_device *rbd_dev)
4085 struct ceph_client *client = rbd_dev->rbd_client->client;
4086 struct ceph_locker *lockers;
4091 ret = rbd_lock(rbd_dev);
4095 /* determine if the current lock holder is still alive */
4096 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
4100 if (num_lockers == 0)
4103 ret = find_watcher(rbd_dev, lockers);
4105 goto out; /* request lock or error */
4107 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4108 ENTITY_NAME(lockers[0].id.name));
4110 ret = ceph_monc_blacklist_add(&client->monc,
4111 &lockers[0].info.addr);
4113 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
4114 ENTITY_NAME(lockers[0].id.name), ret);
4118 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4119 &rbd_dev->header_oloc, RBD_LOCK_NAME,
4120 lockers[0].id.cookie,
4121 &lockers[0].id.name);
4122 if (ret && ret != -ENOENT)
4126 ceph_free_lockers(lockers, num_lockers);
4130 ceph_free_lockers(lockers, num_lockers);
4134 static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4138 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4139 ret = rbd_object_map_open(rbd_dev);
4150 * 1 - caller should call rbd_request_lock()
4153 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4157 down_read(&rbd_dev->lock_rwsem);
4158 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4159 rbd_dev->lock_state);
4160 if (__rbd_is_lock_owner(rbd_dev)) {
4161 up_read(&rbd_dev->lock_rwsem);
4165 up_read(&rbd_dev->lock_rwsem);
4166 down_write(&rbd_dev->lock_rwsem);
4167 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4168 rbd_dev->lock_state);
4169 if (__rbd_is_lock_owner(rbd_dev)) {
4170 up_write(&rbd_dev->lock_rwsem);
4174 ret = rbd_try_lock(rbd_dev);
4176 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4177 if (ret == -EBLACKLISTED)
4180 ret = 1; /* request lock anyway */
4183 up_write(&rbd_dev->lock_rwsem);
4187 rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4188 rbd_assert(list_empty(&rbd_dev->running_list));
4190 ret = rbd_post_acquire_action(rbd_dev);
4192 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4194 * Can't stay in RBD_LOCK_STATE_LOCKED because
4195 * rbd_lock_add_request() would let the request through,
4196 * assuming that e.g. object map is locked and loaded.
4198 rbd_unlock(rbd_dev);
4202 wake_lock_waiters(rbd_dev, ret);
4203 up_write(&rbd_dev->lock_rwsem);
4207 static void rbd_acquire_lock(struct work_struct *work)
4209 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4210 struct rbd_device, lock_dwork);
4213 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4215 ret = rbd_try_acquire_lock(rbd_dev);
4217 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4221 ret = rbd_request_lock(rbd_dev);
4222 if (ret == -ETIMEDOUT) {
4223 goto again; /* treat this as a dead client */
4224 } else if (ret == -EROFS) {
4225 rbd_warn(rbd_dev, "peer will not release lock");
4226 down_write(&rbd_dev->lock_rwsem);
4227 wake_lock_waiters(rbd_dev, ret);
4228 up_write(&rbd_dev->lock_rwsem);
4229 } else if (ret < 0) {
4230 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4231 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4235 * lock owner acked, but resend if we don't see them
4238 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4240 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4241 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4245 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4249 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4250 lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4252 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4256 * Ensure that all in-flight IO is flushed.
4258 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4259 rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4260 need_wait = !list_empty(&rbd_dev->running_list);
4261 downgrade_write(&rbd_dev->lock_rwsem);
4263 wait_for_completion(&rbd_dev->releasing_wait);
4264 up_read(&rbd_dev->lock_rwsem);
4266 down_write(&rbd_dev->lock_rwsem);
4267 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4270 rbd_assert(list_empty(&rbd_dev->running_list));
4274 static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4276 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4277 rbd_object_map_close(rbd_dev);
4280 static void __rbd_release_lock(struct rbd_device *rbd_dev)
4282 rbd_assert(list_empty(&rbd_dev->running_list));
4284 rbd_pre_release_action(rbd_dev);
4285 rbd_unlock(rbd_dev);
4289 * lock_rwsem must be held for write
4291 static void rbd_release_lock(struct rbd_device *rbd_dev)
4293 if (!rbd_quiesce_lock(rbd_dev))
4296 __rbd_release_lock(rbd_dev);
4299 * Give others a chance to grab the lock - we would re-acquire
4300 * almost immediately if we got new IO while draining the running
4301 * list otherwise. We need to ack our own notifications, so this
4302 * lock_dwork will be requeued from rbd_handle_released_lock() by
4303 * way of maybe_kick_acquire().
4305 cancel_delayed_work(&rbd_dev->lock_dwork);
4308 static void rbd_release_lock_work(struct work_struct *work)
4310 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4313 down_write(&rbd_dev->lock_rwsem);
4314 rbd_release_lock(rbd_dev);
4315 up_write(&rbd_dev->lock_rwsem);
4318 static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4322 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4323 if (__rbd_is_lock_owner(rbd_dev))
4326 spin_lock(&rbd_dev->lock_lists_lock);
4327 have_requests = !list_empty(&rbd_dev->acquiring_list);
4328 spin_unlock(&rbd_dev->lock_lists_lock);
4329 if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4330 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4331 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4335 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4338 struct rbd_client_id cid = { 0 };
4340 if (struct_v >= 2) {
4341 cid.gid = ceph_decode_64(p);
4342 cid.handle = ceph_decode_64(p);
4345 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4347 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4348 down_write(&rbd_dev->lock_rwsem);
4349 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4351 * we already know that the remote client is
4354 up_write(&rbd_dev->lock_rwsem);
4358 rbd_set_owner_cid(rbd_dev, &cid);
4359 downgrade_write(&rbd_dev->lock_rwsem);
4361 down_read(&rbd_dev->lock_rwsem);
4364 maybe_kick_acquire(rbd_dev);
4365 up_read(&rbd_dev->lock_rwsem);
4368 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4371 struct rbd_client_id cid = { 0 };
4373 if (struct_v >= 2) {
4374 cid.gid = ceph_decode_64(p);
4375 cid.handle = ceph_decode_64(p);
4378 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4380 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4381 down_write(&rbd_dev->lock_rwsem);
4382 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4383 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
4384 __func__, rbd_dev, cid.gid, cid.handle,
4385 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4386 up_write(&rbd_dev->lock_rwsem);
4390 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4391 downgrade_write(&rbd_dev->lock_rwsem);
4393 down_read(&rbd_dev->lock_rwsem);
4396 maybe_kick_acquire(rbd_dev);
4397 up_read(&rbd_dev->lock_rwsem);
4401 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4402 * ResponseMessage is needed.
4404 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4407 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4408 struct rbd_client_id cid = { 0 };
4411 if (struct_v >= 2) {
4412 cid.gid = ceph_decode_64(p);
4413 cid.handle = ceph_decode_64(p);
4416 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4418 if (rbd_cid_equal(&cid, &my_cid))
4421 down_read(&rbd_dev->lock_rwsem);
4422 if (__rbd_is_lock_owner(rbd_dev)) {
4423 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4424 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4428 * encode ResponseMessage(0) so the peer can detect
4433 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4434 if (!rbd_dev->opts->exclusive) {
4435 dout("%s rbd_dev %p queueing unlock_work\n",
4437 queue_work(rbd_dev->task_wq,
4438 &rbd_dev->unlock_work);
4440 /* refuse to release the lock */
4447 up_read(&rbd_dev->lock_rwsem);
4451 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4452 u64 notify_id, u64 cookie, s32 *result)
4454 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4455 char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4456 int buf_size = sizeof(buf);
4462 /* encode ResponseMessage */
4463 ceph_start_encoding(&p, 1, 1,
4464 buf_size - CEPH_ENCODING_START_BLK_LEN);
4465 ceph_encode_32(&p, *result);
4470 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4471 &rbd_dev->header_oloc, notify_id, cookie,
4474 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4477 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4480 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4481 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4484 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4485 u64 notify_id, u64 cookie, s32 result)
4487 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4488 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4491 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4492 u64 notifier_id, void *data, size_t data_len)
4494 struct rbd_device *rbd_dev = arg;
4496 void *const end = p + data_len;
4502 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4503 __func__, rbd_dev, cookie, notify_id, data_len);
4505 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4508 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4513 notify_op = ceph_decode_32(&p);
4515 /* legacy notification for header updates */
4516 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4520 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4521 switch (notify_op) {
4522 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4523 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4524 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4526 case RBD_NOTIFY_OP_RELEASED_LOCK:
4527 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4528 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4530 case RBD_NOTIFY_OP_REQUEST_LOCK:
4531 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4533 rbd_acknowledge_notify_result(rbd_dev, notify_id,
4536 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4538 case RBD_NOTIFY_OP_HEADER_UPDATE:
4539 ret = rbd_dev_refresh(rbd_dev);
4541 rbd_warn(rbd_dev, "refresh failed: %d", ret);
4543 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4546 if (rbd_is_lock_owner(rbd_dev))
4547 rbd_acknowledge_notify_result(rbd_dev, notify_id,
4548 cookie, -EOPNOTSUPP);
4550 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4555 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4557 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4559 struct rbd_device *rbd_dev = arg;
4561 rbd_warn(rbd_dev, "encountered watch error: %d", err);
4563 down_write(&rbd_dev->lock_rwsem);
4564 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4565 up_write(&rbd_dev->lock_rwsem);
4567 mutex_lock(&rbd_dev->watch_mutex);
4568 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4569 __rbd_unregister_watch(rbd_dev);
4570 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4572 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4574 mutex_unlock(&rbd_dev->watch_mutex);
4578 * watch_mutex must be locked
4580 static int __rbd_register_watch(struct rbd_device *rbd_dev)
4582 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4583 struct ceph_osd_linger_request *handle;
4585 rbd_assert(!rbd_dev->watch_handle);
4586 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4588 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4589 &rbd_dev->header_oloc, rbd_watch_cb,
4590 rbd_watch_errcb, rbd_dev);
4592 return PTR_ERR(handle);
4594 rbd_dev->watch_handle = handle;
4599 * watch_mutex must be locked
4601 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4603 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4606 rbd_assert(rbd_dev->watch_handle);
4607 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4609 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4611 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4613 rbd_dev->watch_handle = NULL;
4616 static int rbd_register_watch(struct rbd_device *rbd_dev)
4620 mutex_lock(&rbd_dev->watch_mutex);
4621 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4622 ret = __rbd_register_watch(rbd_dev);
4626 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4627 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4630 mutex_unlock(&rbd_dev->watch_mutex);
4634 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4636 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4638 cancel_work_sync(&rbd_dev->acquired_lock_work);
4639 cancel_work_sync(&rbd_dev->released_lock_work);
4640 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4641 cancel_work_sync(&rbd_dev->unlock_work);
4644 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4646 cancel_tasks_sync(rbd_dev);
4648 mutex_lock(&rbd_dev->watch_mutex);
4649 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4650 __rbd_unregister_watch(rbd_dev);
4651 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4652 mutex_unlock(&rbd_dev->watch_mutex);
4654 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4655 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4659 * lock_rwsem must be held for write
4661 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4663 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4667 if (!rbd_quiesce_lock(rbd_dev))
4670 format_lock_cookie(rbd_dev, cookie);
4671 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4672 &rbd_dev->header_oloc, RBD_LOCK_NAME,
4673 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4674 RBD_LOCK_TAG, cookie);
4676 if (ret != -EOPNOTSUPP)
4677 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4681 * Lock cookie cannot be updated on older OSDs, so do
4682 * a manual release and queue an acquire.
4684 __rbd_release_lock(rbd_dev);
4685 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4687 __rbd_lock(rbd_dev, cookie);
4688 wake_lock_waiters(rbd_dev, 0);
4692 static void rbd_reregister_watch(struct work_struct *work)
4694 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4695 struct rbd_device, watch_dwork);
4698 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4700 mutex_lock(&rbd_dev->watch_mutex);
4701 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4702 mutex_unlock(&rbd_dev->watch_mutex);
4706 ret = __rbd_register_watch(rbd_dev);
4708 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4709 if (ret != -EBLACKLISTED && ret != -ENOENT) {
4710 queue_delayed_work(rbd_dev->task_wq,
4711 &rbd_dev->watch_dwork,
4713 mutex_unlock(&rbd_dev->watch_mutex);
4717 mutex_unlock(&rbd_dev->watch_mutex);
4718 down_write(&rbd_dev->lock_rwsem);
4719 wake_lock_waiters(rbd_dev, ret);
4720 up_write(&rbd_dev->lock_rwsem);
4724 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4725 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4726 mutex_unlock(&rbd_dev->watch_mutex);
4728 down_write(&rbd_dev->lock_rwsem);
4729 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4730 rbd_reacquire_lock(rbd_dev);
4731 up_write(&rbd_dev->lock_rwsem);
4733 ret = rbd_dev_refresh(rbd_dev);
4735 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4739 * Synchronous osd object method call. Returns the number of bytes
4740 * returned in the outbound buffer, or a negative error code.
4742 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4743 struct ceph_object_id *oid,
4744 struct ceph_object_locator *oloc,
4745 const char *method_name,
4746 const void *outbound,
4747 size_t outbound_size,
4749 size_t inbound_size)
4751 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4752 struct page *req_page = NULL;
4753 struct page *reply_page;
4757 * Method calls are ultimately read operations. The result
4758 * should placed into the inbound buffer provided. They
4759 * also supply outbound data--parameters for the object
4760 * method. Currently if this is present it will be a
4764 if (outbound_size > PAGE_SIZE)
4767 req_page = alloc_page(GFP_KERNEL);
4771 memcpy(page_address(req_page), outbound, outbound_size);
4774 reply_page = alloc_page(GFP_KERNEL);
4777 __free_page(req_page);
4781 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4782 CEPH_OSD_FLAG_READ, req_page, outbound_size,
4783 &reply_page, &inbound_size);
4785 memcpy(inbound, page_address(reply_page), inbound_size);
4790 __free_page(req_page);
4791 __free_page(reply_page);
4795 static void rbd_queue_workfn(struct work_struct *work)
4797 struct request *rq = blk_mq_rq_from_pdu(work);
4798 struct rbd_device *rbd_dev = rq->q->queuedata;
4799 struct rbd_img_request *img_request;
4800 struct ceph_snap_context *snapc = NULL;
4801 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4802 u64 length = blk_rq_bytes(rq);
4803 enum obj_operation_type op_type;
4807 switch (req_op(rq)) {
4808 case REQ_OP_DISCARD:
4809 op_type = OBJ_OP_DISCARD;
4811 case REQ_OP_WRITE_ZEROES:
4812 op_type = OBJ_OP_ZEROOUT;
4815 op_type = OBJ_OP_WRITE;
4818 op_type = OBJ_OP_READ;
4821 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4826 /* Ignore/skip any zero-length requests */
4829 dout("%s: zero-length request\n", __func__);
4834 if (op_type != OBJ_OP_READ && rbd_is_snap(rbd_dev)) {
4835 rbd_warn(rbd_dev, "%s on read-only snapshot",
4836 obj_op_name(op_type));
4842 * Quit early if the mapped snapshot no longer exists. It's
4843 * still possible the snapshot will have disappeared by the
4844 * time our request arrives at the osd, but there's no sense in
4845 * sending it if we already know.
4847 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4848 dout("request for non-existent snapshot");
4849 rbd_assert(rbd_is_snap(rbd_dev));
4854 if (offset && length > U64_MAX - offset + 1) {
4855 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4858 goto err_rq; /* Shouldn't happen */
4861 blk_mq_start_request(rq);
4863 down_read(&rbd_dev->header_rwsem);
4864 mapping_size = rbd_dev->mapping.size;
4865 if (op_type != OBJ_OP_READ) {
4866 snapc = rbd_dev->header.snapc;
4867 ceph_get_snap_context(snapc);
4869 up_read(&rbd_dev->header_rwsem);
4871 if (offset + length > mapping_size) {
4872 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4873 length, mapping_size);
4878 img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
4883 img_request->rq = rq;
4884 snapc = NULL; /* img_request consumes a ref */
4886 dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4887 img_request, obj_op_name(op_type), offset, length);
4889 if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4890 result = rbd_img_fill_nodata(img_request, offset, length);
4892 result = rbd_img_fill_from_bio(img_request, offset, length,
4895 goto err_img_request;
4897 rbd_img_handle_request(img_request, 0);
4901 rbd_img_request_put(img_request);
4904 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4905 obj_op_name(op_type), length, offset, result);
4906 ceph_put_snap_context(snapc);
4908 blk_mq_end_request(rq, errno_to_blk_status(result));
4911 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4912 const struct blk_mq_queue_data *bd)
4914 struct request *rq = bd->rq;
4915 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4917 queue_work(rbd_wq, work);
4921 static void rbd_free_disk(struct rbd_device *rbd_dev)
4923 blk_cleanup_queue(rbd_dev->disk->queue);
4924 blk_mq_free_tag_set(&rbd_dev->tag_set);
4925 put_disk(rbd_dev->disk);
4926 rbd_dev->disk = NULL;
4929 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4930 struct ceph_object_id *oid,
4931 struct ceph_object_locator *oloc,
4932 void *buf, int buf_len)
4935 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4936 struct ceph_osd_request *req;
4937 struct page **pages;
4938 int num_pages = calc_pages_for(0, buf_len);
4941 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4945 ceph_oid_copy(&req->r_base_oid, oid);
4946 ceph_oloc_copy(&req->r_base_oloc, oloc);
4947 req->r_flags = CEPH_OSD_FLAG_READ;
4949 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4950 if (IS_ERR(pages)) {
4951 ret = PTR_ERR(pages);
4955 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4956 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4959 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4963 ceph_osdc_start_request(osdc, req, false);
4964 ret = ceph_osdc_wait_request(osdc, req);
4966 ceph_copy_from_page_vector(pages, buf, 0, ret);
4969 ceph_osdc_put_request(req);
4974 * Read the complete header for the given rbd device. On successful
4975 * return, the rbd_dev->header field will contain up-to-date
4976 * information about the image.
4978 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4980 struct rbd_image_header_ondisk *ondisk = NULL;
4987 * The complete header will include an array of its 64-bit
4988 * snapshot ids, followed by the names of those snapshots as
4989 * a contiguous block of NUL-terminated strings. Note that
4990 * the number of snapshots could change by the time we read
4991 * it in, in which case we re-read it.
4998 size = sizeof (*ondisk);
4999 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
5001 ondisk = kmalloc(size, GFP_KERNEL);
5005 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
5006 &rbd_dev->header_oloc, ondisk, size);
5009 if ((size_t)ret < size) {
5011 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
5015 if (!rbd_dev_ondisk_valid(ondisk)) {
5017 rbd_warn(rbd_dev, "invalid header");
5021 names_size = le64_to_cpu(ondisk->snap_names_len);
5022 want_count = snap_count;
5023 snap_count = le32_to_cpu(ondisk->snap_count);
5024 } while (snap_count != want_count);
5026 ret = rbd_header_from_disk(rbd_dev, ondisk);
5034 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
5035 * has disappeared from the (just updated) snapshot context.
5037 static void rbd_exists_validate(struct rbd_device *rbd_dev)
5041 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
5044 snap_id = rbd_dev->spec->snap_id;
5045 if (snap_id == CEPH_NOSNAP)
5048 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
5049 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5052 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
5057 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
5058 * try to update its size. If REMOVING is set, updating size
5059 * is just useless work since the device can't be opened.
5061 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
5062 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
5063 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
5064 dout("setting size to %llu sectors", (unsigned long long)size);
5065 set_capacity(rbd_dev->disk, size);
5066 revalidate_disk(rbd_dev->disk);
5070 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
5075 down_write(&rbd_dev->header_rwsem);
5076 mapping_size = rbd_dev->mapping.size;
5078 ret = rbd_dev_header_info(rbd_dev);
5083 * If there is a parent, see if it has disappeared due to the
5084 * mapped image getting flattened.
5086 if (rbd_dev->parent) {
5087 ret = rbd_dev_v2_parent_info(rbd_dev);
5092 if (!rbd_is_snap(rbd_dev)) {
5093 rbd_dev->mapping.size = rbd_dev->header.image_size;
5095 /* validate mapped snapshot's EXISTS flag */
5096 rbd_exists_validate(rbd_dev);
5100 up_write(&rbd_dev->header_rwsem);
5101 if (!ret && mapping_size != rbd_dev->mapping.size)
5102 rbd_dev_update_size(rbd_dev);
5107 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
5108 unsigned int hctx_idx, unsigned int numa_node)
5110 struct work_struct *work = blk_mq_rq_to_pdu(rq);
5112 INIT_WORK(work, rbd_queue_workfn);
5116 static const struct blk_mq_ops rbd_mq_ops = {
5117 .queue_rq = rbd_queue_rq,
5118 .init_request = rbd_init_request,
5121 static int rbd_init_disk(struct rbd_device *rbd_dev)
5123 struct gendisk *disk;
5124 struct request_queue *q;
5125 unsigned int objset_bytes =
5126 rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
5129 /* create gendisk info */
5130 disk = alloc_disk(single_major ?
5131 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
5132 RBD_MINORS_PER_MAJOR);
5136 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
5138 disk->major = rbd_dev->major;
5139 disk->first_minor = rbd_dev->minor;
5141 disk->flags |= GENHD_FL_EXT_DEVT;
5142 disk->fops = &rbd_bd_ops;
5143 disk->private_data = rbd_dev;
5145 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
5146 rbd_dev->tag_set.ops = &rbd_mq_ops;
5147 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
5148 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
5149 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
5150 rbd_dev->tag_set.nr_hw_queues = 1;
5151 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
5153 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
5157 q = blk_mq_init_queue(&rbd_dev->tag_set);
5163 blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5164 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
5166 blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
5167 q->limits.max_sectors = queue_max_hw_sectors(q);
5168 blk_queue_max_segments(q, USHRT_MAX);
5169 blk_queue_max_segment_size(q, UINT_MAX);
5170 blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5171 blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
5173 if (rbd_dev->opts->trim) {
5174 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
5175 q->limits.discard_granularity = rbd_dev->opts->alloc_size;
5176 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
5177 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
5180 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5181 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
5184 * disk_release() expects a queue ref from add_disk() and will
5185 * put it. Hold an extra ref until add_disk() is called.
5187 WARN_ON(!blk_get_queue(q));
5189 q->queuedata = rbd_dev;
5191 rbd_dev->disk = disk;
5195 blk_mq_free_tag_set(&rbd_dev->tag_set);
5205 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5207 return container_of(dev, struct rbd_device, dev);
5210 static ssize_t rbd_size_show(struct device *dev,
5211 struct device_attribute *attr, char *buf)
5213 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5215 return sprintf(buf, "%llu\n",
5216 (unsigned long long)rbd_dev->mapping.size);
5220 * Note this shows the features for whatever's mapped, which is not
5221 * necessarily the base image.
5223 static ssize_t rbd_features_show(struct device *dev,
5224 struct device_attribute *attr, char *buf)
5226 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5228 return sprintf(buf, "0x%016llx\n",
5229 (unsigned long long)rbd_dev->mapping.features);
5232 static ssize_t rbd_major_show(struct device *dev,
5233 struct device_attribute *attr, char *buf)
5235 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5238 return sprintf(buf, "%d\n", rbd_dev->major);
5240 return sprintf(buf, "(none)\n");
5243 static ssize_t rbd_minor_show(struct device *dev,
5244 struct device_attribute *attr, char *buf)
5246 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5248 return sprintf(buf, "%d\n", rbd_dev->minor);
5251 static ssize_t rbd_client_addr_show(struct device *dev,
5252 struct device_attribute *attr, char *buf)
5254 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5255 struct ceph_entity_addr *client_addr =
5256 ceph_client_addr(rbd_dev->rbd_client->client);
5258 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5259 le32_to_cpu(client_addr->nonce));
5262 static ssize_t rbd_client_id_show(struct device *dev,
5263 struct device_attribute *attr, char *buf)
5265 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5267 return sprintf(buf, "client%lld\n",
5268 ceph_client_gid(rbd_dev->rbd_client->client));
5271 static ssize_t rbd_cluster_fsid_show(struct device *dev,
5272 struct device_attribute *attr, char *buf)
5274 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5276 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5279 static ssize_t rbd_config_info_show(struct device *dev,
5280 struct device_attribute *attr, char *buf)
5282 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5284 return sprintf(buf, "%s\n", rbd_dev->config_info);
5287 static ssize_t rbd_pool_show(struct device *dev,
5288 struct device_attribute *attr, char *buf)
5290 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5292 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5295 static ssize_t rbd_pool_id_show(struct device *dev,
5296 struct device_attribute *attr, char *buf)
5298 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5300 return sprintf(buf, "%llu\n",
5301 (unsigned long long) rbd_dev->spec->pool_id);
5304 static ssize_t rbd_pool_ns_show(struct device *dev,
5305 struct device_attribute *attr, char *buf)
5307 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5309 return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5312 static ssize_t rbd_name_show(struct device *dev,
5313 struct device_attribute *attr, char *buf)
5315 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5317 if (rbd_dev->spec->image_name)
5318 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5320 return sprintf(buf, "(unknown)\n");
5323 static ssize_t rbd_image_id_show(struct device *dev,
5324 struct device_attribute *attr, char *buf)
5326 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5328 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5332 * Shows the name of the currently-mapped snapshot (or
5333 * RBD_SNAP_HEAD_NAME for the base image).
5335 static ssize_t rbd_snap_show(struct device *dev,
5336 struct device_attribute *attr,
5339 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5341 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5344 static ssize_t rbd_snap_id_show(struct device *dev,
5345 struct device_attribute *attr, char *buf)
5347 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5349 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5353 * For a v2 image, shows the chain of parent images, separated by empty
5354 * lines. For v1 images or if there is no parent, shows "(no parent
5357 static ssize_t rbd_parent_show(struct device *dev,
5358 struct device_attribute *attr,
5361 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5364 if (!rbd_dev->parent)
5365 return sprintf(buf, "(no parent image)\n");
5367 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5368 struct rbd_spec *spec = rbd_dev->parent_spec;
5370 count += sprintf(&buf[count], "%s"
5371 "pool_id %llu\npool_name %s\n"
5373 "image_id %s\nimage_name %s\n"
5374 "snap_id %llu\nsnap_name %s\n"
5376 !count ? "" : "\n", /* first? */
5377 spec->pool_id, spec->pool_name,
5378 spec->pool_ns ?: "",
5379 spec->image_id, spec->image_name ?: "(unknown)",
5380 spec->snap_id, spec->snap_name,
5381 rbd_dev->parent_overlap);
5387 static ssize_t rbd_image_refresh(struct device *dev,
5388 struct device_attribute *attr,
5392 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5395 ret = rbd_dev_refresh(rbd_dev);
5402 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5403 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5404 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5405 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5406 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5407 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5408 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5409 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5410 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5411 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5412 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5413 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5414 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5415 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5416 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5417 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5418 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5420 static struct attribute *rbd_attrs[] = {
5421 &dev_attr_size.attr,
5422 &dev_attr_features.attr,
5423 &dev_attr_major.attr,
5424 &dev_attr_minor.attr,
5425 &dev_attr_client_addr.attr,
5426 &dev_attr_client_id.attr,
5427 &dev_attr_cluster_fsid.attr,
5428 &dev_attr_config_info.attr,
5429 &dev_attr_pool.attr,
5430 &dev_attr_pool_id.attr,
5431 &dev_attr_pool_ns.attr,
5432 &dev_attr_name.attr,
5433 &dev_attr_image_id.attr,
5434 &dev_attr_current_snap.attr,
5435 &dev_attr_snap_id.attr,
5436 &dev_attr_parent.attr,
5437 &dev_attr_refresh.attr,
5441 static struct attribute_group rbd_attr_group = {
5445 static const struct attribute_group *rbd_attr_groups[] = {
5450 static void rbd_dev_release(struct device *dev);
5452 static const struct device_type rbd_device_type = {
5454 .groups = rbd_attr_groups,
5455 .release = rbd_dev_release,
5458 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5460 kref_get(&spec->kref);
5465 static void rbd_spec_free(struct kref *kref);
5466 static void rbd_spec_put(struct rbd_spec *spec)
5469 kref_put(&spec->kref, rbd_spec_free);
5472 static struct rbd_spec *rbd_spec_alloc(void)
5474 struct rbd_spec *spec;
5476 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5480 spec->pool_id = CEPH_NOPOOL;
5481 spec->snap_id = CEPH_NOSNAP;
5482 kref_init(&spec->kref);
5487 static void rbd_spec_free(struct kref *kref)
5489 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5491 kfree(spec->pool_name);
5492 kfree(spec->pool_ns);
5493 kfree(spec->image_id);
5494 kfree(spec->image_name);
5495 kfree(spec->snap_name);
5499 static void rbd_dev_free(struct rbd_device *rbd_dev)
5501 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5502 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5504 ceph_oid_destroy(&rbd_dev->header_oid);
5505 ceph_oloc_destroy(&rbd_dev->header_oloc);
5506 kfree(rbd_dev->config_info);
5508 rbd_put_client(rbd_dev->rbd_client);
5509 rbd_spec_put(rbd_dev->spec);
5510 kfree(rbd_dev->opts);
5514 static void rbd_dev_release(struct device *dev)
5516 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5517 bool need_put = !!rbd_dev->opts;
5520 destroy_workqueue(rbd_dev->task_wq);
5521 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5524 rbd_dev_free(rbd_dev);
5527 * This is racy, but way better than putting module outside of
5528 * the release callback. The race window is pretty small, so
5529 * doing something similar to dm (dm-builtin.c) is overkill.
5532 module_put(THIS_MODULE);
5535 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5536 struct rbd_spec *spec)
5538 struct rbd_device *rbd_dev;
5540 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5544 spin_lock_init(&rbd_dev->lock);
5545 INIT_LIST_HEAD(&rbd_dev->node);
5546 init_rwsem(&rbd_dev->header_rwsem);
5548 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5549 ceph_oid_init(&rbd_dev->header_oid);
5550 rbd_dev->header_oloc.pool = spec->pool_id;
5551 if (spec->pool_ns) {
5552 WARN_ON(!*spec->pool_ns);
5553 rbd_dev->header_oloc.pool_ns =
5554 ceph_find_or_create_string(spec->pool_ns,
5555 strlen(spec->pool_ns));
5558 mutex_init(&rbd_dev->watch_mutex);
5559 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5560 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5562 init_rwsem(&rbd_dev->lock_rwsem);
5563 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5564 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5565 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5566 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5567 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5568 spin_lock_init(&rbd_dev->lock_lists_lock);
5569 INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5570 INIT_LIST_HEAD(&rbd_dev->running_list);
5571 init_completion(&rbd_dev->acquire_wait);
5572 init_completion(&rbd_dev->releasing_wait);
5574 spin_lock_init(&rbd_dev->object_map_lock);
5576 rbd_dev->dev.bus = &rbd_bus_type;
5577 rbd_dev->dev.type = &rbd_device_type;
5578 rbd_dev->dev.parent = &rbd_root_dev;
5579 device_initialize(&rbd_dev->dev);
5581 rbd_dev->rbd_client = rbdc;
5582 rbd_dev->spec = spec;
5588 * Create a mapping rbd_dev.
5590 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5591 struct rbd_spec *spec,
5592 struct rbd_options *opts)
5594 struct rbd_device *rbd_dev;
5596 rbd_dev = __rbd_dev_create(rbdc, spec);
5600 rbd_dev->opts = opts;
5602 /* get an id and fill in device name */
5603 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5604 minor_to_rbd_dev_id(1 << MINORBITS),
5606 if (rbd_dev->dev_id < 0)
5609 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5610 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5612 if (!rbd_dev->task_wq)
5615 /* we have a ref from do_rbd_add() */
5616 __module_get(THIS_MODULE);
5618 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5622 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5624 rbd_dev_free(rbd_dev);
5628 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5631 put_device(&rbd_dev->dev);
5635 * Get the size and object order for an image snapshot, or if
5636 * snap_id is CEPH_NOSNAP, gets this information for the base
5639 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5640 u8 *order, u64 *snap_size)
5642 __le64 snapid = cpu_to_le64(snap_id);
5647 } __attribute__ ((packed)) size_buf = { 0 };
5649 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5650 &rbd_dev->header_oloc, "get_size",
5651 &snapid, sizeof(snapid),
5652 &size_buf, sizeof(size_buf));
5653 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5656 if (ret < sizeof (size_buf))
5660 *order = size_buf.order;
5661 dout(" order %u", (unsigned int)*order);
5663 *snap_size = le64_to_cpu(size_buf.size);
5665 dout(" snap_id 0x%016llx snap_size = %llu\n",
5666 (unsigned long long)snap_id,
5667 (unsigned long long)*snap_size);
5672 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5674 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5675 &rbd_dev->header.obj_order,
5676 &rbd_dev->header.image_size);
5679 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5686 /* Response will be an encoded string, which includes a length */
5687 size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5688 reply_buf = kzalloc(size, GFP_KERNEL);
5692 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5693 &rbd_dev->header_oloc, "get_object_prefix",
5694 NULL, 0, reply_buf, size);
5695 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5700 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5701 p + ret, NULL, GFP_NOIO);
5704 if (IS_ERR(rbd_dev->header.object_prefix)) {
5705 ret = PTR_ERR(rbd_dev->header.object_prefix);
5706 rbd_dev->header.object_prefix = NULL;
5708 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
5716 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5719 __le64 snapid = cpu_to_le64(snap_id);
5723 } __attribute__ ((packed)) features_buf = { 0 };
5727 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5728 &rbd_dev->header_oloc, "get_features",
5729 &snapid, sizeof(snapid),
5730 &features_buf, sizeof(features_buf));
5731 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5734 if (ret < sizeof (features_buf))
5737 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5739 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5744 *snap_features = le64_to_cpu(features_buf.features);
5746 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5747 (unsigned long long)snap_id,
5748 (unsigned long long)*snap_features,
5749 (unsigned long long)le64_to_cpu(features_buf.incompat));
5754 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5756 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5757 &rbd_dev->header.features);
5761 * These are generic image flags, but since they are used only for
5762 * object map, store them in rbd_dev->object_map_flags.
5764 * For the same reason, this function is called only on object map
5765 * (re)load and not on header refresh.
5767 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5769 __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5773 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5774 &rbd_dev->header_oloc, "get_flags",
5775 &snapid, sizeof(snapid),
5776 &flags, sizeof(flags));
5779 if (ret < sizeof(flags))
5782 rbd_dev->object_map_flags = le64_to_cpu(flags);
5786 struct parent_image_info {
5788 const char *pool_ns;
5789 const char *image_id;
5797 * The caller is responsible for @pii.
5799 static int decode_parent_image_spec(void **p, void *end,
5800 struct parent_image_info *pii)
5806 ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5807 &struct_v, &struct_len);
5811 ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5812 pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5813 if (IS_ERR(pii->pool_ns)) {
5814 ret = PTR_ERR(pii->pool_ns);
5815 pii->pool_ns = NULL;
5818 pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5819 if (IS_ERR(pii->image_id)) {
5820 ret = PTR_ERR(pii->image_id);
5821 pii->image_id = NULL;
5824 ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5831 static int __get_parent_info(struct rbd_device *rbd_dev,
5832 struct page *req_page,
5833 struct page *reply_page,
5834 struct parent_image_info *pii)
5836 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5837 size_t reply_len = PAGE_SIZE;
5841 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5842 "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5843 req_page, sizeof(u64), &reply_page, &reply_len);
5845 return ret == -EOPNOTSUPP ? 1 : ret;
5847 p = page_address(reply_page);
5848 end = p + reply_len;
5849 ret = decode_parent_image_spec(&p, end, pii);
5853 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5854 "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5855 req_page, sizeof(u64), &reply_page, &reply_len);
5859 p = page_address(reply_page);
5860 end = p + reply_len;
5861 ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5862 if (pii->has_overlap)
5863 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5872 * The caller is responsible for @pii.
5874 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5875 struct page *req_page,
5876 struct page *reply_page,
5877 struct parent_image_info *pii)
5879 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5880 size_t reply_len = PAGE_SIZE;
5884 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5885 "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5886 req_page, sizeof(u64), &reply_page, &reply_len);
5890 p = page_address(reply_page);
5891 end = p + reply_len;
5892 ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5893 pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5894 if (IS_ERR(pii->image_id)) {
5895 ret = PTR_ERR(pii->image_id);
5896 pii->image_id = NULL;
5899 ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5900 pii->has_overlap = true;
5901 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5909 static int get_parent_info(struct rbd_device *rbd_dev,
5910 struct parent_image_info *pii)
5912 struct page *req_page, *reply_page;
5916 req_page = alloc_page(GFP_KERNEL);
5920 reply_page = alloc_page(GFP_KERNEL);
5922 __free_page(req_page);
5926 p = page_address(req_page);
5927 ceph_encode_64(&p, rbd_dev->spec->snap_id);
5928 ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5930 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5933 __free_page(req_page);
5934 __free_page(reply_page);
5938 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5940 struct rbd_spec *parent_spec;
5941 struct parent_image_info pii = { 0 };
5944 parent_spec = rbd_spec_alloc();
5948 ret = get_parent_info(rbd_dev, &pii);
5952 dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5953 __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5954 pii.has_overlap, pii.overlap);
5956 if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5958 * Either the parent never existed, or we have
5959 * record of it but the image got flattened so it no
5960 * longer has a parent. When the parent of a
5961 * layered image disappears we immediately set the
5962 * overlap to 0. The effect of this is that all new
5963 * requests will be treated as if the image had no
5966 * If !pii.has_overlap, the parent image spec is not
5967 * applicable. It's there to avoid duplication in each
5970 if (rbd_dev->parent_overlap) {
5971 rbd_dev->parent_overlap = 0;
5972 rbd_dev_parent_put(rbd_dev);
5973 pr_info("%s: clone image has been flattened\n",
5974 rbd_dev->disk->disk_name);
5977 goto out; /* No parent? No problem. */
5980 /* The ceph file layout needs to fit pool id in 32 bits */
5983 if (pii.pool_id > (u64)U32_MAX) {
5984 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5985 (unsigned long long)pii.pool_id, U32_MAX);
5990 * The parent won't change (except when the clone is
5991 * flattened, already handled that). So we only need to
5992 * record the parent spec we have not already done so.
5994 if (!rbd_dev->parent_spec) {
5995 parent_spec->pool_id = pii.pool_id;
5996 if (pii.pool_ns && *pii.pool_ns) {
5997 parent_spec->pool_ns = pii.pool_ns;
6000 parent_spec->image_id = pii.image_id;
6001 pii.image_id = NULL;
6002 parent_spec->snap_id = pii.snap_id;
6004 rbd_dev->parent_spec = parent_spec;
6005 parent_spec = NULL; /* rbd_dev now owns this */
6009 * We always update the parent overlap. If it's zero we issue
6010 * a warning, as we will proceed as if there was no parent.
6014 /* refresh, careful to warn just once */
6015 if (rbd_dev->parent_overlap)
6017 "clone now standalone (overlap became 0)");
6020 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
6023 rbd_dev->parent_overlap = pii.overlap;
6029 kfree(pii.image_id);
6030 rbd_spec_put(parent_spec);
6034 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
6038 __le64 stripe_count;
6039 } __attribute__ ((packed)) striping_info_buf = { 0 };
6040 size_t size = sizeof (striping_info_buf);
6044 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6045 &rbd_dev->header_oloc, "get_stripe_unit_count",
6046 NULL, 0, &striping_info_buf, size);
6047 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6053 p = &striping_info_buf;
6054 rbd_dev->header.stripe_unit = ceph_decode_64(&p);
6055 rbd_dev->header.stripe_count = ceph_decode_64(&p);
6059 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
6061 __le64 data_pool_id;
6064 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6065 &rbd_dev->header_oloc, "get_data_pool",
6066 NULL, 0, &data_pool_id, sizeof(data_pool_id));
6069 if (ret < sizeof(data_pool_id))
6072 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
6073 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
6077 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
6079 CEPH_DEFINE_OID_ONSTACK(oid);
6080 size_t image_id_size;
6085 void *reply_buf = NULL;
6087 char *image_name = NULL;
6090 rbd_assert(!rbd_dev->spec->image_name);
6092 len = strlen(rbd_dev->spec->image_id);
6093 image_id_size = sizeof (__le32) + len;
6094 image_id = kmalloc(image_id_size, GFP_KERNEL);
6099 end = image_id + image_id_size;
6100 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
6102 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
6103 reply_buf = kmalloc(size, GFP_KERNEL);
6107 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
6108 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6109 "dir_get_name", image_id, image_id_size,
6114 end = reply_buf + ret;
6116 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
6117 if (IS_ERR(image_name))
6120 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
6128 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6130 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6131 const char *snap_name;
6134 /* Skip over names until we find the one we are looking for */
6136 snap_name = rbd_dev->header.snap_names;
6137 while (which < snapc->num_snaps) {
6138 if (!strcmp(name, snap_name))
6139 return snapc->snaps[which];
6140 snap_name += strlen(snap_name) + 1;
6146 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6148 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6153 for (which = 0; !found && which < snapc->num_snaps; which++) {
6154 const char *snap_name;
6156 snap_id = snapc->snaps[which];
6157 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6158 if (IS_ERR(snap_name)) {
6159 /* ignore no-longer existing snapshots */
6160 if (PTR_ERR(snap_name) == -ENOENT)
6165 found = !strcmp(name, snap_name);
6168 return found ? snap_id : CEPH_NOSNAP;
6172 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
6173 * no snapshot by that name is found, or if an error occurs.
6175 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6177 if (rbd_dev->image_format == 1)
6178 return rbd_v1_snap_id_by_name(rbd_dev, name);
6180 return rbd_v2_snap_id_by_name(rbd_dev, name);
6184 * An image being mapped will have everything but the snap id.
6186 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6188 struct rbd_spec *spec = rbd_dev->spec;
6190 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6191 rbd_assert(spec->image_id && spec->image_name);
6192 rbd_assert(spec->snap_name);
6194 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6197 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6198 if (snap_id == CEPH_NOSNAP)
6201 spec->snap_id = snap_id;
6203 spec->snap_id = CEPH_NOSNAP;
6210 * A parent image will have all ids but none of the names.
6212 * All names in an rbd spec are dynamically allocated. It's OK if we
6213 * can't figure out the name for an image id.
6215 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6217 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6218 struct rbd_spec *spec = rbd_dev->spec;
6219 const char *pool_name;
6220 const char *image_name;
6221 const char *snap_name;
6224 rbd_assert(spec->pool_id != CEPH_NOPOOL);
6225 rbd_assert(spec->image_id);
6226 rbd_assert(spec->snap_id != CEPH_NOSNAP);
6228 /* Get the pool name; we have to make our own copy of this */
6230 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6232 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6235 pool_name = kstrdup(pool_name, GFP_KERNEL);
6239 /* Fetch the image name; tolerate failure here */
6241 image_name = rbd_dev_image_name(rbd_dev);
6243 rbd_warn(rbd_dev, "unable to get image name");
6245 /* Fetch the snapshot name */
6247 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6248 if (IS_ERR(snap_name)) {
6249 ret = PTR_ERR(snap_name);
6253 spec->pool_name = pool_name;
6254 spec->image_name = image_name;
6255 spec->snap_name = snap_name;
6265 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6274 struct ceph_snap_context *snapc;
6278 * We'll need room for the seq value (maximum snapshot id),
6279 * snapshot count, and array of that many snapshot ids.
6280 * For now we have a fixed upper limit on the number we're
6281 * prepared to receive.
6283 size = sizeof (__le64) + sizeof (__le32) +
6284 RBD_MAX_SNAP_COUNT * sizeof (__le64);
6285 reply_buf = kzalloc(size, GFP_KERNEL);
6289 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6290 &rbd_dev->header_oloc, "get_snapcontext",
6291 NULL, 0, reply_buf, size);
6292 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6297 end = reply_buf + ret;
6299 ceph_decode_64_safe(&p, end, seq, out);
6300 ceph_decode_32_safe(&p, end, snap_count, out);
6303 * Make sure the reported number of snapshot ids wouldn't go
6304 * beyond the end of our buffer. But before checking that,
6305 * make sure the computed size of the snapshot context we
6306 * allocate is representable in a size_t.
6308 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6313 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6317 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6323 for (i = 0; i < snap_count; i++)
6324 snapc->snaps[i] = ceph_decode_64(&p);
6326 ceph_put_snap_context(rbd_dev->header.snapc);
6327 rbd_dev->header.snapc = snapc;
6329 dout(" snap context seq = %llu, snap_count = %u\n",
6330 (unsigned long long)seq, (unsigned int)snap_count);
6337 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6348 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6349 reply_buf = kmalloc(size, GFP_KERNEL);
6351 return ERR_PTR(-ENOMEM);
6353 snapid = cpu_to_le64(snap_id);
6354 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6355 &rbd_dev->header_oloc, "get_snapshot_name",
6356 &snapid, sizeof(snapid), reply_buf, size);
6357 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6359 snap_name = ERR_PTR(ret);
6364 end = reply_buf + ret;
6365 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6366 if (IS_ERR(snap_name))
6369 dout(" snap_id 0x%016llx snap_name = %s\n",
6370 (unsigned long long)snap_id, snap_name);
6377 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6379 bool first_time = rbd_dev->header.object_prefix == NULL;
6382 ret = rbd_dev_v2_image_size(rbd_dev);
6387 ret = rbd_dev_v2_header_onetime(rbd_dev);
6392 ret = rbd_dev_v2_snap_context(rbd_dev);
6393 if (ret && first_time) {
6394 kfree(rbd_dev->header.object_prefix);
6395 rbd_dev->header.object_prefix = NULL;
6401 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6403 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6405 if (rbd_dev->image_format == 1)
6406 return rbd_dev_v1_header_info(rbd_dev);
6408 return rbd_dev_v2_header_info(rbd_dev);
6412 * Skips over white space at *buf, and updates *buf to point to the
6413 * first found non-space character (if any). Returns the length of
6414 * the token (string of non-white space characters) found. Note
6415 * that *buf must be terminated with '\0'.
6417 static inline size_t next_token(const char **buf)
6420 * These are the characters that produce nonzero for
6421 * isspace() in the "C" and "POSIX" locales.
6423 const char *spaces = " \f\n\r\t\v";
6425 *buf += strspn(*buf, spaces); /* Find start of token */
6427 return strcspn(*buf, spaces); /* Return token length */
6431 * Finds the next token in *buf, dynamically allocates a buffer big
6432 * enough to hold a copy of it, and copies the token into the new
6433 * buffer. The copy is guaranteed to be terminated with '\0'. Note
6434 * that a duplicate buffer is created even for a zero-length token.
6436 * Returns a pointer to the newly-allocated duplicate, or a null
6437 * pointer if memory for the duplicate was not available. If
6438 * the lenp argument is a non-null pointer, the length of the token
6439 * (not including the '\0') is returned in *lenp.
6441 * If successful, the *buf pointer will be updated to point beyond
6442 * the end of the found token.
6444 * Note: uses GFP_KERNEL for allocation.
6446 static inline char *dup_token(const char **buf, size_t *lenp)
6451 len = next_token(buf);
6452 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6455 *(dup + len) = '\0';
6465 * Parse the options provided for an "rbd add" (i.e., rbd image
6466 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
6467 * and the data written is passed here via a NUL-terminated buffer.
6468 * Returns 0 if successful or an error code otherwise.
6470 * The information extracted from these options is recorded in
6471 * the other parameters which return dynamically-allocated
6474 * The address of a pointer that will refer to a ceph options
6475 * structure. Caller must release the returned pointer using
6476 * ceph_destroy_options() when it is no longer needed.
6478 * Address of an rbd options pointer. Fully initialized by
6479 * this function; caller must release with kfree().
6481 * Address of an rbd image specification pointer. Fully
6482 * initialized by this function based on parsed options.
6483 * Caller must release with rbd_spec_put().
6485 * The options passed take this form:
6486 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6489 * A comma-separated list of one or more monitor addresses.
6490 * A monitor address is an ip address, optionally followed
6491 * by a port number (separated by a colon).
6492 * I.e.: ip1[:port1][,ip2[:port2]...]
6494 * A comma-separated list of ceph and/or rbd options.
6496 * The name of the rados pool containing the rbd image.
6498 * The name of the image in that pool to map.
6500 * An optional snapshot id. If provided, the mapping will
6501 * present data from the image at the time that snapshot was
6502 * created. The image head is used if no snapshot id is
6503 * provided. Snapshot mappings are always read-only.
6505 static int rbd_add_parse_args(const char *buf,
6506 struct ceph_options **ceph_opts,
6507 struct rbd_options **opts,
6508 struct rbd_spec **rbd_spec)
6512 const char *mon_addrs;
6514 size_t mon_addrs_size;
6515 struct parse_rbd_opts_ctx pctx = { 0 };
6516 struct ceph_options *copts;
6519 /* The first four tokens are required */
6521 len = next_token(&buf);
6523 rbd_warn(NULL, "no monitor address(es) provided");
6527 mon_addrs_size = len + 1;
6531 options = dup_token(&buf, NULL);
6535 rbd_warn(NULL, "no options provided");
6539 pctx.spec = rbd_spec_alloc();
6543 pctx.spec->pool_name = dup_token(&buf, NULL);
6544 if (!pctx.spec->pool_name)
6546 if (!*pctx.spec->pool_name) {
6547 rbd_warn(NULL, "no pool name provided");
6551 pctx.spec->image_name = dup_token(&buf, NULL);
6552 if (!pctx.spec->image_name)
6554 if (!*pctx.spec->image_name) {
6555 rbd_warn(NULL, "no image name provided");
6560 * Snapshot name is optional; default is to use "-"
6561 * (indicating the head/no snapshot).
6563 len = next_token(&buf);
6565 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6566 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6567 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6568 ret = -ENAMETOOLONG;
6571 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6574 *(snap_name + len) = '\0';
6575 pctx.spec->snap_name = snap_name;
6577 /* Initialize all rbd options to the defaults */
6579 pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6583 pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6584 pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6585 pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6586 pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6587 pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6588 pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6589 pctx.opts->trim = RBD_TRIM_DEFAULT;
6591 copts = ceph_parse_options(options, mon_addrs,
6592 mon_addrs + mon_addrs_size - 1,
6593 parse_rbd_opts_token, &pctx);
6594 if (IS_ERR(copts)) {
6595 ret = PTR_ERR(copts);
6602 *rbd_spec = pctx.spec;
6609 rbd_spec_put(pctx.spec);
6615 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6617 down_write(&rbd_dev->lock_rwsem);
6618 if (__rbd_is_lock_owner(rbd_dev))
6619 __rbd_release_lock(rbd_dev);
6620 up_write(&rbd_dev->lock_rwsem);
6624 * If the wait is interrupted, an error is returned even if the lock
6625 * was successfully acquired. rbd_dev_image_unlock() will release it
6628 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6632 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6633 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6636 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6640 if (rbd_is_snap(rbd_dev))
6643 rbd_assert(!rbd_is_lock_owner(rbd_dev));
6644 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6645 ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6646 ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6648 ret = rbd_dev->acquire_err;
6650 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6656 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6661 * The lock may have been released by now, unless automatic lock
6662 * transitions are disabled.
6664 rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6669 * An rbd format 2 image has a unique identifier, distinct from the
6670 * name given to it by the user. Internally, that identifier is
6671 * what's used to specify the names of objects related to the image.
6673 * A special "rbd id" object is used to map an rbd image name to its
6674 * id. If that object doesn't exist, then there is no v2 rbd image
6675 * with the supplied name.
6677 * This function will record the given rbd_dev's image_id field if
6678 * it can be determined, and in that case will return 0. If any
6679 * errors occur a negative errno will be returned and the rbd_dev's
6680 * image_id field will be unchanged (and should be NULL).
6682 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6686 CEPH_DEFINE_OID_ONSTACK(oid);
6691 * When probing a parent image, the image id is already
6692 * known (and the image name likely is not). There's no
6693 * need to fetch the image id again in this case. We
6694 * do still need to set the image format though.
6696 if (rbd_dev->spec->image_id) {
6697 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6703 * First, see if the format 2 image id file exists, and if
6704 * so, get the image's persistent id from it.
6706 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6707 rbd_dev->spec->image_name);
6711 dout("rbd id object name is %s\n", oid.name);
6713 /* Response will be an encoded string, which includes a length */
6714 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6715 response = kzalloc(size, GFP_NOIO);
6721 /* If it doesn't exist we'll assume it's a format 1 image */
6723 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6726 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6727 if (ret == -ENOENT) {
6728 image_id = kstrdup("", GFP_KERNEL);
6729 ret = image_id ? 0 : -ENOMEM;
6731 rbd_dev->image_format = 1;
6732 } else if (ret >= 0) {
6735 image_id = ceph_extract_encoded_string(&p, p + ret,
6737 ret = PTR_ERR_OR_ZERO(image_id);
6739 rbd_dev->image_format = 2;
6743 rbd_dev->spec->image_id = image_id;
6744 dout("image_id is %s\n", image_id);
6748 ceph_oid_destroy(&oid);
6753 * Undo whatever state changes are made by v1 or v2 header info
6756 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6758 struct rbd_image_header *header;
6760 rbd_dev_parent_put(rbd_dev);
6761 rbd_object_map_free(rbd_dev);
6762 rbd_dev_mapping_clear(rbd_dev);
6764 /* Free dynamic fields from the header, then zero it out */
6766 header = &rbd_dev->header;
6767 ceph_put_snap_context(header->snapc);
6768 kfree(header->snap_sizes);
6769 kfree(header->snap_names);
6770 kfree(header->object_prefix);
6771 memset(header, 0, sizeof (*header));
6774 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6778 ret = rbd_dev_v2_object_prefix(rbd_dev);
6783 * Get the and check features for the image. Currently the
6784 * features are assumed to never change.
6786 ret = rbd_dev_v2_features(rbd_dev);
6790 /* If the image supports fancy striping, get its parameters */
6792 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6793 ret = rbd_dev_v2_striping_info(rbd_dev);
6798 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6799 ret = rbd_dev_v2_data_pool(rbd_dev);
6804 rbd_init_layout(rbd_dev);
6808 rbd_dev->header.features = 0;
6809 kfree(rbd_dev->header.object_prefix);
6810 rbd_dev->header.object_prefix = NULL;
6815 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6816 * rbd_dev_image_probe() recursion depth, which means it's also the
6817 * length of the already discovered part of the parent chain.
6819 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6821 struct rbd_device *parent = NULL;
6824 if (!rbd_dev->parent_spec)
6827 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6828 pr_info("parent chain is too long (%d)\n", depth);
6833 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6840 * Images related by parent/child relationships always share
6841 * rbd_client and spec/parent_spec, so bump their refcounts.
6843 __rbd_get_client(rbd_dev->rbd_client);
6844 rbd_spec_get(rbd_dev->parent_spec);
6846 ret = rbd_dev_image_probe(parent, depth);
6850 rbd_dev->parent = parent;
6851 atomic_set(&rbd_dev->parent_ref, 1);
6855 rbd_dev_unparent(rbd_dev);
6856 rbd_dev_destroy(parent);
6860 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6862 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6863 rbd_free_disk(rbd_dev);
6865 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6869 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6872 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6876 /* Record our major and minor device numbers. */
6878 if (!single_major) {
6879 ret = register_blkdev(0, rbd_dev->name);
6881 goto err_out_unlock;
6883 rbd_dev->major = ret;
6886 rbd_dev->major = rbd_major;
6887 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6890 /* Set up the blkdev mapping. */
6892 ret = rbd_init_disk(rbd_dev);
6894 goto err_out_blkdev;
6896 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6897 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
6899 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6903 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6904 up_write(&rbd_dev->header_rwsem);
6908 rbd_free_disk(rbd_dev);
6911 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6913 up_write(&rbd_dev->header_rwsem);
6917 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6919 struct rbd_spec *spec = rbd_dev->spec;
6922 /* Record the header object name for this rbd image. */
6924 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6925 if (rbd_dev->image_format == 1)
6926 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6927 spec->image_name, RBD_SUFFIX);
6929 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6930 RBD_HEADER_PREFIX, spec->image_id);
6935 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6937 rbd_dev_unprobe(rbd_dev);
6939 rbd_unregister_watch(rbd_dev);
6940 rbd_dev->image_format = 0;
6941 kfree(rbd_dev->spec->image_id);
6942 rbd_dev->spec->image_id = NULL;
6946 * Probe for the existence of the header object for the given rbd
6947 * device. If this image is the one being mapped (i.e., not a
6948 * parent), initiate a watch on its header object before using that
6949 * object to get detailed information about the rbd image.
6951 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6956 * Get the id from the image id object. Unless there's an
6957 * error, rbd_dev->spec->image_id will be filled in with
6958 * a dynamically-allocated string, and rbd_dev->image_format
6959 * will be set to either 1 or 2.
6961 ret = rbd_dev_image_id(rbd_dev);
6965 ret = rbd_dev_header_name(rbd_dev);
6967 goto err_out_format;
6970 ret = rbd_register_watch(rbd_dev);
6973 pr_info("image %s/%s%s%s does not exist\n",
6974 rbd_dev->spec->pool_name,
6975 rbd_dev->spec->pool_ns ?: "",
6976 rbd_dev->spec->pool_ns ? "/" : "",
6977 rbd_dev->spec->image_name);
6978 goto err_out_format;
6982 ret = rbd_dev_header_info(rbd_dev);
6987 * If this image is the one being mapped, we have pool name and
6988 * id, image name and id, and snap name - need to fill snap id.
6989 * Otherwise this is a parent image, identified by pool, image
6990 * and snap ids - need to fill in names for those ids.
6993 ret = rbd_spec_fill_snap_id(rbd_dev);
6995 ret = rbd_spec_fill_names(rbd_dev);
6998 pr_info("snap %s/%s%s%s@%s does not exist\n",
6999 rbd_dev->spec->pool_name,
7000 rbd_dev->spec->pool_ns ?: "",
7001 rbd_dev->spec->pool_ns ? "/" : "",
7002 rbd_dev->spec->image_name,
7003 rbd_dev->spec->snap_name);
7007 ret = rbd_dev_mapping_set(rbd_dev);
7011 if (rbd_is_snap(rbd_dev) &&
7012 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
7013 ret = rbd_object_map_load(rbd_dev);
7018 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
7019 ret = rbd_dev_v2_parent_info(rbd_dev);
7024 ret = rbd_dev_probe_parent(rbd_dev, depth);
7028 dout("discovered format %u image, header name is %s\n",
7029 rbd_dev->image_format, rbd_dev->header_oid.name);
7033 rbd_dev_unprobe(rbd_dev);
7036 rbd_unregister_watch(rbd_dev);
7038 rbd_dev->image_format = 0;
7039 kfree(rbd_dev->spec->image_id);
7040 rbd_dev->spec->image_id = NULL;
7044 static ssize_t do_rbd_add(struct bus_type *bus,
7048 struct rbd_device *rbd_dev = NULL;
7049 struct ceph_options *ceph_opts = NULL;
7050 struct rbd_options *rbd_opts = NULL;
7051 struct rbd_spec *spec = NULL;
7052 struct rbd_client *rbdc;
7055 if (!try_module_get(THIS_MODULE))
7058 /* parse add command */
7059 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7063 rbdc = rbd_get_client(ceph_opts);
7070 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7073 pr_info("pool %s does not exist\n", spec->pool_name);
7074 goto err_out_client;
7076 spec->pool_id = (u64)rc;
7078 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7081 goto err_out_client;
7083 rbdc = NULL; /* rbd_dev now owns this */
7084 spec = NULL; /* rbd_dev now owns this */
7085 rbd_opts = NULL; /* rbd_dev now owns this */
7087 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7088 if (!rbd_dev->config_info) {
7090 goto err_out_rbd_dev;
7093 down_write(&rbd_dev->header_rwsem);
7094 rc = rbd_dev_image_probe(rbd_dev, 0);
7096 up_write(&rbd_dev->header_rwsem);
7097 goto err_out_rbd_dev;
7100 /* If we are mapping a snapshot it must be marked read-only */
7101 if (rbd_is_snap(rbd_dev))
7102 rbd_dev->opts->read_only = true;
7104 if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7105 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7106 rbd_dev->layout.object_size);
7107 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7110 rc = rbd_dev_device_setup(rbd_dev);
7112 goto err_out_image_probe;
7114 rc = rbd_add_acquire_lock(rbd_dev);
7116 goto err_out_image_lock;
7118 /* Everything's ready. Announce the disk to the world. */
7120 rc = device_add(&rbd_dev->dev);
7122 goto err_out_image_lock;
7124 add_disk(rbd_dev->disk);
7125 /* see rbd_init_disk() */
7126 blk_put_queue(rbd_dev->disk->queue);
7128 spin_lock(&rbd_dev_list_lock);
7129 list_add_tail(&rbd_dev->node, &rbd_dev_list);
7130 spin_unlock(&rbd_dev_list_lock);
7132 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7133 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7134 rbd_dev->header.features);
7137 module_put(THIS_MODULE);
7141 rbd_dev_image_unlock(rbd_dev);
7142 rbd_dev_device_release(rbd_dev);
7143 err_out_image_probe:
7144 rbd_dev_image_release(rbd_dev);
7146 rbd_dev_destroy(rbd_dev);
7148 rbd_put_client(rbdc);
7155 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7160 return do_rbd_add(bus, buf, count);
7163 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7166 return do_rbd_add(bus, buf, count);
7169 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7171 while (rbd_dev->parent) {
7172 struct rbd_device *first = rbd_dev;
7173 struct rbd_device *second = first->parent;
7174 struct rbd_device *third;
7177 * Follow to the parent with no grandparent and
7180 while (second && (third = second->parent)) {
7185 rbd_dev_image_release(second);
7186 rbd_dev_destroy(second);
7187 first->parent = NULL;
7188 first->parent_overlap = 0;
7190 rbd_assert(first->parent_spec);
7191 rbd_spec_put(first->parent_spec);
7192 first->parent_spec = NULL;
7196 static ssize_t do_rbd_remove(struct bus_type *bus,
7200 struct rbd_device *rbd_dev = NULL;
7201 struct list_head *tmp;
7209 sscanf(buf, "%d %5s", &dev_id, opt_buf);
7211 pr_err("dev_id out of range\n");
7214 if (opt_buf[0] != '\0') {
7215 if (!strcmp(opt_buf, "force")) {
7218 pr_err("bad remove option at '%s'\n", opt_buf);
7224 spin_lock(&rbd_dev_list_lock);
7225 list_for_each(tmp, &rbd_dev_list) {
7226 rbd_dev = list_entry(tmp, struct rbd_device, node);
7227 if (rbd_dev->dev_id == dev_id) {
7233 spin_lock_irq(&rbd_dev->lock);
7234 if (rbd_dev->open_count && !force)
7236 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7239 spin_unlock_irq(&rbd_dev->lock);
7241 spin_unlock(&rbd_dev_list_lock);
7247 * Prevent new IO from being queued and wait for existing
7248 * IO to complete/fail.
7250 blk_mq_freeze_queue(rbd_dev->disk->queue);
7251 blk_set_queue_dying(rbd_dev->disk->queue);
7254 del_gendisk(rbd_dev->disk);
7255 spin_lock(&rbd_dev_list_lock);
7256 list_del_init(&rbd_dev->node);
7257 spin_unlock(&rbd_dev_list_lock);
7258 device_del(&rbd_dev->dev);
7260 rbd_dev_image_unlock(rbd_dev);
7261 rbd_dev_device_release(rbd_dev);
7262 rbd_dev_image_release(rbd_dev);
7263 rbd_dev_destroy(rbd_dev);
7267 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7272 return do_rbd_remove(bus, buf, count);
7275 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7278 return do_rbd_remove(bus, buf, count);
7282 * create control files in sysfs
7285 static int __init rbd_sysfs_init(void)
7289 ret = device_register(&rbd_root_dev);
7293 ret = bus_register(&rbd_bus_type);
7295 device_unregister(&rbd_root_dev);
7300 static void __exit rbd_sysfs_cleanup(void)
7302 bus_unregister(&rbd_bus_type);
7303 device_unregister(&rbd_root_dev);
7306 static int __init rbd_slab_init(void)
7308 rbd_assert(!rbd_img_request_cache);
7309 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7310 if (!rbd_img_request_cache)
7313 rbd_assert(!rbd_obj_request_cache);
7314 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7315 if (!rbd_obj_request_cache)
7321 kmem_cache_destroy(rbd_img_request_cache);
7322 rbd_img_request_cache = NULL;
7326 static void rbd_slab_exit(void)
7328 rbd_assert(rbd_obj_request_cache);
7329 kmem_cache_destroy(rbd_obj_request_cache);
7330 rbd_obj_request_cache = NULL;
7332 rbd_assert(rbd_img_request_cache);
7333 kmem_cache_destroy(rbd_img_request_cache);
7334 rbd_img_request_cache = NULL;
7337 static int __init rbd_init(void)
7341 if (!libceph_compatible(NULL)) {
7342 rbd_warn(NULL, "libceph incompatibility (quitting)");
7346 rc = rbd_slab_init();
7351 * The number of active work items is limited by the number of
7352 * rbd devices * queue depth, so leave @max_active at default.
7354 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7361 rbd_major = register_blkdev(0, RBD_DRV_NAME);
7362 if (rbd_major < 0) {
7368 rc = rbd_sysfs_init();
7370 goto err_out_blkdev;
7373 pr_info("loaded (major %d)\n", rbd_major);
7375 pr_info("loaded\n");
7381 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7383 destroy_workqueue(rbd_wq);
7389 static void __exit rbd_exit(void)
7391 ida_destroy(&rbd_dev_id_ida);
7392 rbd_sysfs_cleanup();
7394 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7395 destroy_workqueue(rbd_wq);
7399 module_init(rbd_init);
7400 module_exit(rbd_exit);
7402 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7403 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7404 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7405 /* following authorship retained from original osdblk.c */
7406 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7408 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7409 MODULE_LICENSE("GPL");