3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
50 #include "rbd_types.h"
52 #define RBD_DEBUG /* Activate rbd_assert() calls */
55 * Increment the given counter and return its updated value.
56 * If the counter is already 0 it will not be incremented.
57 * If the counter is already at its maximum value returns
58 * -EINVAL without updating it.
60 static int atomic_inc_return_safe(atomic_t *v)
64 counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65 if (counter <= (unsigned int)INT_MAX)
73 /* Decrement the counter. Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
78 counter = atomic_dec_return(v);
87 #define RBD_DRV_NAME "rbd"
89 #define RBD_MINORS_PER_MAJOR 256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
92 #define RBD_MAX_PARENT_CHAIN_LEN 16
94 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN \
96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
98 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
100 #define RBD_SNAP_HEAD_NAME "-"
102 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX 64
108 #define RBD_OBJ_PREFIX_LEN_MAX 64
110 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */
111 #define RBD_RETRY_DELAY msecs_to_jiffies(1000)
115 #define RBD_FEATURE_LAYERING (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118 #define RBD_FEATURE_OBJECT_MAP (1ULL<<3)
119 #define RBD_FEATURE_FAST_DIFF (1ULL<<4)
120 #define RBD_FEATURE_DEEP_FLATTEN (1ULL<<5)
121 #define RBD_FEATURE_DATA_POOL (1ULL<<7)
122 #define RBD_FEATURE_OPERATIONS (1ULL<<8)
124 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
125 RBD_FEATURE_STRIPINGV2 | \
126 RBD_FEATURE_EXCLUSIVE_LOCK | \
127 RBD_FEATURE_OBJECT_MAP | \
128 RBD_FEATURE_FAST_DIFF | \
129 RBD_FEATURE_DEEP_FLATTEN | \
130 RBD_FEATURE_DATA_POOL | \
131 RBD_FEATURE_OPERATIONS)
133 /* Features supported by this (client software) implementation. */
135 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
138 * An RBD device name will be "rbd#", where the "rbd" comes from
139 * RBD_DRV_NAME above, and # is a unique integer identifier.
141 #define DEV_NAME_LEN 32
144 * block device image metadata (in-memory version)
146 struct rbd_image_header {
147 /* These six fields never change for a given rbd image */
153 u64 features; /* Might be changeable someday? */
155 /* The remaining fields need to be updated occasionally */
157 struct ceph_snap_context *snapc;
158 char *snap_names; /* format 1 only */
159 u64 *snap_sizes; /* format 1 only */
163 * An rbd image specification.
165 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166 * identify an image. Each rbd_dev structure includes a pointer to
167 * an rbd_spec structure that encapsulates this identity.
169 * Each of the id's in an rbd_spec has an associated name. For a
170 * user-mapped image, the names are supplied and the id's associated
171 * with them are looked up. For a layered image, a parent image is
172 * defined by the tuple, and the names are looked up.
174 * An rbd_dev structure contains a parent_spec pointer which is
175 * non-null if the image it represents is a child in a layered
176 * image. This pointer will refer to the rbd_spec structure used
177 * by the parent rbd_dev for its own identity (i.e., the structure
178 * is shared between the parent and child).
180 * Since these structures are populated once, during the discovery
181 * phase of image construction, they are effectively immutable so
182 * we make no effort to synchronize access to them.
184 * Note that code herein does not assume the image name is known (it
185 * could be a null pointer).
189 const char *pool_name;
190 const char *pool_ns; /* NULL if default, never "" */
192 const char *image_id;
193 const char *image_name;
196 const char *snap_name;
202 * an instance of the client. multiple devices may share an rbd client.
205 struct ceph_client *client;
207 struct list_head node;
210 struct pending_result {
211 int result; /* first nonzero result */
215 struct rbd_img_request;
217 enum obj_request_type {
218 OBJ_REQUEST_NODATA = 1,
219 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
220 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
221 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
224 enum obj_operation_type {
231 #define RBD_OBJ_FLAG_DELETION (1U << 0)
232 #define RBD_OBJ_FLAG_COPYUP_ENABLED (1U << 1)
233 #define RBD_OBJ_FLAG_COPYUP_ZEROS (1U << 2)
234 #define RBD_OBJ_FLAG_MAY_EXIST (1U << 3)
235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT (1U << 4)
237 enum rbd_obj_read_state {
238 RBD_OBJ_READ_START = 1,
244 * Writes go through the following state machine to deal with
247 * . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
250 * . RBD_OBJ_WRITE_READ_FROM_PARENT. . . .
252 * . v v (deep-copyup .
253 * (image . RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC . not needed) .
256 * . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . . (copyup .
259 * done . . . . . . . . . . . . . . . . . .
264 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
265 * assert_exists guard is needed or not (in some cases it's not needed
266 * even if there is a parent).
268 enum rbd_obj_write_state {
269 RBD_OBJ_WRITE_START = 1,
270 RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271 RBD_OBJ_WRITE_OBJECT,
272 __RBD_OBJ_WRITE_COPYUP,
273 RBD_OBJ_WRITE_COPYUP,
274 RBD_OBJ_WRITE_POST_OBJECT_MAP,
277 enum rbd_obj_copyup_state {
278 RBD_OBJ_COPYUP_START = 1,
279 RBD_OBJ_COPYUP_READ_PARENT,
280 __RBD_OBJ_COPYUP_OBJECT_MAPS,
281 RBD_OBJ_COPYUP_OBJECT_MAPS,
282 __RBD_OBJ_COPYUP_WRITE_OBJECT,
283 RBD_OBJ_COPYUP_WRITE_OBJECT,
286 struct rbd_obj_request {
287 struct ceph_object_extent ex;
288 unsigned int flags; /* RBD_OBJ_FLAG_* */
290 enum rbd_obj_read_state read_state; /* for reads */
291 enum rbd_obj_write_state write_state; /* for writes */
294 struct rbd_img_request *img_request;
295 struct ceph_file_extent *img_extents;
299 struct ceph_bio_iter bio_pos;
301 struct ceph_bvec_iter bvec_pos;
307 enum rbd_obj_copyup_state copyup_state;
308 struct bio_vec *copyup_bvecs;
309 u32 copyup_bvec_count;
311 struct list_head osd_reqs; /* w/ r_private_item */
313 struct mutex state_mutex;
314 struct pending_result pending;
319 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
320 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
325 RBD_IMG_EXCLUSIVE_LOCK,
326 __RBD_IMG_OBJECT_REQUESTS,
327 RBD_IMG_OBJECT_REQUESTS,
330 struct rbd_img_request {
331 struct rbd_device *rbd_dev;
332 enum obj_operation_type op_type;
333 enum obj_request_type data_type;
335 enum rbd_img_state state;
337 u64 snap_id; /* for reads */
338 struct ceph_snap_context *snapc; /* for writes */
341 struct request *rq; /* block request */
342 struct rbd_obj_request *obj_request; /* obj req initiator */
345 struct list_head lock_item;
346 struct list_head object_extents; /* obj_req.ex structs */
348 struct mutex state_mutex;
349 struct pending_result pending;
350 struct work_struct work;
355 #define for_each_obj_request(ireq, oreq) \
356 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
357 #define for_each_obj_request_safe(ireq, oreq, n) \
358 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
360 enum rbd_watch_state {
361 RBD_WATCH_STATE_UNREGISTERED,
362 RBD_WATCH_STATE_REGISTERED,
363 RBD_WATCH_STATE_ERROR,
366 enum rbd_lock_state {
367 RBD_LOCK_STATE_UNLOCKED,
368 RBD_LOCK_STATE_LOCKED,
369 RBD_LOCK_STATE_RELEASING,
372 /* WatchNotify::ClientId */
373 struct rbd_client_id {
387 int dev_id; /* blkdev unique id */
389 int major; /* blkdev assigned major */
391 struct gendisk *disk; /* blkdev's gendisk and rq */
393 u32 image_format; /* Either 1 or 2 */
394 struct rbd_client *rbd_client;
396 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
398 spinlock_t lock; /* queue, flags, open_count */
400 struct rbd_image_header header;
401 unsigned long flags; /* possibly lock protected */
402 struct rbd_spec *spec;
403 struct rbd_options *opts;
404 char *config_info; /* add{,_single_major} string */
406 struct ceph_object_id header_oid;
407 struct ceph_object_locator header_oloc;
409 struct ceph_file_layout layout; /* used for all rbd requests */
411 struct mutex watch_mutex;
412 enum rbd_watch_state watch_state;
413 struct ceph_osd_linger_request *watch_handle;
415 struct delayed_work watch_dwork;
417 struct rw_semaphore lock_rwsem;
418 enum rbd_lock_state lock_state;
419 char lock_cookie[32];
420 struct rbd_client_id owner_cid;
421 struct work_struct acquired_lock_work;
422 struct work_struct released_lock_work;
423 struct delayed_work lock_dwork;
424 struct work_struct unlock_work;
425 spinlock_t lock_lists_lock;
426 struct list_head acquiring_list;
427 struct list_head running_list;
428 struct completion acquire_wait;
430 struct completion releasing_wait;
432 spinlock_t object_map_lock;
434 u64 object_map_size; /* in objects */
435 u64 object_map_flags;
437 struct workqueue_struct *task_wq;
439 struct rbd_spec *parent_spec;
442 struct rbd_device *parent;
444 /* Block layer tags. */
445 struct blk_mq_tag_set tag_set;
447 /* protects updating the header */
448 struct rw_semaphore header_rwsem;
450 struct rbd_mapping mapping;
452 struct list_head node;
456 unsigned long open_count; /* protected by lock */
460 * Flag bits for rbd_dev->flags:
461 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
465 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
466 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
467 RBD_DEV_FLAG_READONLY, /* -o ro or snapshot */
470 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
472 static LIST_HEAD(rbd_dev_list); /* devices */
473 static DEFINE_SPINLOCK(rbd_dev_list_lock);
475 static LIST_HEAD(rbd_client_list); /* clients */
476 static DEFINE_SPINLOCK(rbd_client_list_lock);
478 /* Slab caches for frequently-allocated structures */
480 static struct kmem_cache *rbd_img_request_cache;
481 static struct kmem_cache *rbd_obj_request_cache;
483 static int rbd_major;
484 static DEFINE_IDA(rbd_dev_id_ida);
486 static struct workqueue_struct *rbd_wq;
488 static struct ceph_snap_context rbd_empty_snapc = {
489 .nref = REFCOUNT_INIT(1),
493 * single-major requires >= 0.75 version of userspace rbd utility.
495 static bool single_major = true;
496 module_param(single_major, bool, 0444);
497 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
499 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
500 static ssize_t remove_store(struct bus_type *bus, const char *buf,
502 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
504 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
506 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
508 static int rbd_dev_id_to_minor(int dev_id)
510 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
513 static int minor_to_rbd_dev_id(int minor)
515 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
518 static bool rbd_is_ro(struct rbd_device *rbd_dev)
520 return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
523 static bool rbd_is_snap(struct rbd_device *rbd_dev)
525 return rbd_dev->spec->snap_id != CEPH_NOSNAP;
528 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
530 lockdep_assert_held(&rbd_dev->lock_rwsem);
532 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
533 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
536 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
540 down_read(&rbd_dev->lock_rwsem);
541 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
542 up_read(&rbd_dev->lock_rwsem);
543 return is_lock_owner;
546 static ssize_t supported_features_show(struct bus_type *bus, char *buf)
548 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
551 static BUS_ATTR_WO(add);
552 static BUS_ATTR_WO(remove);
553 static BUS_ATTR_WO(add_single_major);
554 static BUS_ATTR_WO(remove_single_major);
555 static BUS_ATTR_RO(supported_features);
557 static struct attribute *rbd_bus_attrs[] = {
559 &bus_attr_remove.attr,
560 &bus_attr_add_single_major.attr,
561 &bus_attr_remove_single_major.attr,
562 &bus_attr_supported_features.attr,
566 static umode_t rbd_bus_is_visible(struct kobject *kobj,
567 struct attribute *attr, int index)
570 (attr == &bus_attr_add_single_major.attr ||
571 attr == &bus_attr_remove_single_major.attr))
577 static const struct attribute_group rbd_bus_group = {
578 .attrs = rbd_bus_attrs,
579 .is_visible = rbd_bus_is_visible,
581 __ATTRIBUTE_GROUPS(rbd_bus);
583 static struct bus_type rbd_bus_type = {
585 .bus_groups = rbd_bus_groups,
588 static void rbd_root_dev_release(struct device *dev)
592 static struct device rbd_root_dev = {
594 .release = rbd_root_dev_release,
597 static __printf(2, 3)
598 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
600 struct va_format vaf;
608 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
609 else if (rbd_dev->disk)
610 printk(KERN_WARNING "%s: %s: %pV\n",
611 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
612 else if (rbd_dev->spec && rbd_dev->spec->image_name)
613 printk(KERN_WARNING "%s: image %s: %pV\n",
614 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
615 else if (rbd_dev->spec && rbd_dev->spec->image_id)
616 printk(KERN_WARNING "%s: id %s: %pV\n",
617 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
619 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
620 RBD_DRV_NAME, rbd_dev, &vaf);
625 #define rbd_assert(expr) \
626 if (unlikely(!(expr))) { \
627 printk(KERN_ERR "\nAssertion failure in %s() " \
629 "\trbd_assert(%s);\n\n", \
630 __func__, __LINE__, #expr); \
633 #else /* !RBD_DEBUG */
634 # define rbd_assert(expr) ((void) 0)
635 #endif /* !RBD_DEBUG */
637 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
639 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
640 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
641 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
642 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
643 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
645 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
646 u8 *order, u64 *snap_size);
647 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
649 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
651 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
652 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
655 * Return true if nothing else is pending.
657 static bool pending_result_dec(struct pending_result *pending, int *result)
659 rbd_assert(pending->num_pending > 0);
661 if (*result && !pending->result)
662 pending->result = *result;
663 if (--pending->num_pending)
666 *result = pending->result;
670 static int rbd_open(struct block_device *bdev, fmode_t mode)
672 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
673 bool removing = false;
675 spin_lock_irq(&rbd_dev->lock);
676 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
679 rbd_dev->open_count++;
680 spin_unlock_irq(&rbd_dev->lock);
684 (void) get_device(&rbd_dev->dev);
689 static void rbd_release(struct gendisk *disk, fmode_t mode)
691 struct rbd_device *rbd_dev = disk->private_data;
692 unsigned long open_count_before;
694 spin_lock_irq(&rbd_dev->lock);
695 open_count_before = rbd_dev->open_count--;
696 spin_unlock_irq(&rbd_dev->lock);
697 rbd_assert(open_count_before > 0);
699 put_device(&rbd_dev->dev);
702 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
706 if (get_user(ro, (int __user *)arg))
709 /* Snapshots can't be marked read-write */
710 if (rbd_is_snap(rbd_dev) && !ro)
713 /* Let blkdev_roset() handle it */
717 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
718 unsigned int cmd, unsigned long arg)
720 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
725 ret = rbd_ioctl_set_ro(rbd_dev, arg);
735 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
736 unsigned int cmd, unsigned long arg)
738 return rbd_ioctl(bdev, mode, cmd, arg);
740 #endif /* CONFIG_COMPAT */
742 static const struct block_device_operations rbd_bd_ops = {
743 .owner = THIS_MODULE,
745 .release = rbd_release,
748 .compat_ioctl = rbd_compat_ioctl,
753 * Initialize an rbd client instance. Success or not, this function
754 * consumes ceph_opts. Caller holds client_mutex.
756 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
758 struct rbd_client *rbdc;
761 dout("%s:\n", __func__);
762 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
766 kref_init(&rbdc->kref);
767 INIT_LIST_HEAD(&rbdc->node);
769 rbdc->client = ceph_create_client(ceph_opts, rbdc);
770 if (IS_ERR(rbdc->client))
772 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
774 ret = ceph_open_session(rbdc->client);
778 spin_lock(&rbd_client_list_lock);
779 list_add_tail(&rbdc->node, &rbd_client_list);
780 spin_unlock(&rbd_client_list_lock);
782 dout("%s: rbdc %p\n", __func__, rbdc);
786 ceph_destroy_client(rbdc->client);
791 ceph_destroy_options(ceph_opts);
792 dout("%s: error %d\n", __func__, ret);
797 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
799 kref_get(&rbdc->kref);
805 * Find a ceph client with specific addr and configuration. If
806 * found, bump its reference count.
808 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
810 struct rbd_client *client_node;
813 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
816 spin_lock(&rbd_client_list_lock);
817 list_for_each_entry(client_node, &rbd_client_list, node) {
818 if (!ceph_compare_options(ceph_opts, client_node->client)) {
819 __rbd_get_client(client_node);
825 spin_unlock(&rbd_client_list_lock);
827 return found ? client_node : NULL;
831 * (Per device) rbd map options
841 /* string args above */
850 static match_table_t rbd_opts_tokens = {
851 {Opt_queue_depth, "queue_depth=%d"},
852 {Opt_alloc_size, "alloc_size=%d"},
853 {Opt_lock_timeout, "lock_timeout=%d"},
855 {Opt_pool_ns, "_pool_ns=%s"},
856 /* string args above */
857 {Opt_read_only, "read_only"},
858 {Opt_read_only, "ro"}, /* Alternate spelling */
859 {Opt_read_write, "read_write"},
860 {Opt_read_write, "rw"}, /* Alternate spelling */
861 {Opt_lock_on_read, "lock_on_read"},
862 {Opt_exclusive, "exclusive"},
863 {Opt_notrim, "notrim"},
870 unsigned long lock_timeout;
877 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
878 #define RBD_ALLOC_SIZE_DEFAULT (64 * 1024)
879 #define RBD_LOCK_TIMEOUT_DEFAULT 0 /* no timeout */
880 #define RBD_READ_ONLY_DEFAULT false
881 #define RBD_LOCK_ON_READ_DEFAULT false
882 #define RBD_EXCLUSIVE_DEFAULT false
883 #define RBD_TRIM_DEFAULT true
885 struct parse_rbd_opts_ctx {
886 struct rbd_spec *spec;
887 struct rbd_options *opts;
890 static int parse_rbd_opts_token(char *c, void *private)
892 struct parse_rbd_opts_ctx *pctx = private;
893 substring_t argstr[MAX_OPT_ARGS];
894 int token, intval, ret;
896 token = match_token(c, rbd_opts_tokens, argstr);
897 if (token < Opt_last_int) {
898 ret = match_int(&argstr[0], &intval);
900 pr_err("bad option arg (not int) at '%s'\n", c);
903 dout("got int token %d val %d\n", token, intval);
904 } else if (token > Opt_last_int && token < Opt_last_string) {
905 dout("got string token %d val %s\n", token, argstr[0].from);
907 dout("got token %d\n", token);
911 case Opt_queue_depth:
913 pr_err("queue_depth out of range\n");
916 pctx->opts->queue_depth = intval;
919 if (intval < SECTOR_SIZE) {
920 pr_err("alloc_size out of range\n");
923 if (!is_power_of_2(intval)) {
924 pr_err("alloc_size must be a power of 2\n");
927 pctx->opts->alloc_size = intval;
929 case Opt_lock_timeout:
930 /* 0 is "wait forever" (i.e. infinite timeout) */
931 if (intval < 0 || intval > INT_MAX / 1000) {
932 pr_err("lock_timeout out of range\n");
935 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
938 kfree(pctx->spec->pool_ns);
939 pctx->spec->pool_ns = match_strdup(argstr);
940 if (!pctx->spec->pool_ns)
944 pctx->opts->read_only = true;
947 pctx->opts->read_only = false;
949 case Opt_lock_on_read:
950 pctx->opts->lock_on_read = true;
953 pctx->opts->exclusive = true;
956 pctx->opts->trim = false;
959 /* libceph prints "bad option" msg */
966 static char* obj_op_name(enum obj_operation_type op_type)
983 * Destroy ceph client
985 * Caller must hold rbd_client_list_lock.
987 static void rbd_client_release(struct kref *kref)
989 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
991 dout("%s: rbdc %p\n", __func__, rbdc);
992 spin_lock(&rbd_client_list_lock);
993 list_del(&rbdc->node);
994 spin_unlock(&rbd_client_list_lock);
996 ceph_destroy_client(rbdc->client);
1001 * Drop reference to ceph client node. If it's not referenced anymore, release
1004 static void rbd_put_client(struct rbd_client *rbdc)
1007 kref_put(&rbdc->kref, rbd_client_release);
1011 * Get a ceph client with specific addr and configuration, if one does
1012 * not exist create it. Either way, ceph_opts is consumed by this
1015 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
1017 struct rbd_client *rbdc;
1020 mutex_lock(&client_mutex);
1021 rbdc = rbd_client_find(ceph_opts);
1023 ceph_destroy_options(ceph_opts);
1026 * Using an existing client. Make sure ->pg_pools is up to
1027 * date before we look up the pool id in do_rbd_add().
1029 ret = ceph_wait_for_latest_osdmap(rbdc->client,
1030 rbdc->client->options->mount_timeout);
1032 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
1033 rbd_put_client(rbdc);
1034 rbdc = ERR_PTR(ret);
1037 rbdc = rbd_client_create(ceph_opts);
1039 mutex_unlock(&client_mutex);
1044 static bool rbd_image_format_valid(u32 image_format)
1046 return image_format == 1 || image_format == 2;
1049 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
1054 /* The header has to start with the magic rbd header text */
1055 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
1058 /* The bio layer requires at least sector-sized I/O */
1060 if (ondisk->options.order < SECTOR_SHIFT)
1063 /* If we use u64 in a few spots we may be able to loosen this */
1065 if (ondisk->options.order > 8 * sizeof (int) - 1)
1069 * The size of a snapshot header has to fit in a size_t, and
1070 * that limits the number of snapshots.
1072 snap_count = le32_to_cpu(ondisk->snap_count);
1073 size = SIZE_MAX - sizeof (struct ceph_snap_context);
1074 if (snap_count > size / sizeof (__le64))
1078 * Not only that, but the size of the entire the snapshot
1079 * header must also be representable in a size_t.
1081 size -= snap_count * sizeof (__le64);
1082 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
1089 * returns the size of an object in the image
1091 static u32 rbd_obj_bytes(struct rbd_image_header *header)
1093 return 1U << header->obj_order;
1096 static void rbd_init_layout(struct rbd_device *rbd_dev)
1098 if (rbd_dev->header.stripe_unit == 0 ||
1099 rbd_dev->header.stripe_count == 0) {
1100 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1101 rbd_dev->header.stripe_count = 1;
1104 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1105 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1106 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1107 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1108 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1109 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1113 * Fill an rbd image header with information from the given format 1
1116 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1117 struct rbd_image_header_ondisk *ondisk)
1119 struct rbd_image_header *header = &rbd_dev->header;
1120 bool first_time = header->object_prefix == NULL;
1121 struct ceph_snap_context *snapc;
1122 char *object_prefix = NULL;
1123 char *snap_names = NULL;
1124 u64 *snap_sizes = NULL;
1129 /* Allocate this now to avoid having to handle failure below */
1132 object_prefix = kstrndup(ondisk->object_prefix,
1133 sizeof(ondisk->object_prefix),
1139 /* Allocate the snapshot context and fill it in */
1141 snap_count = le32_to_cpu(ondisk->snap_count);
1142 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1145 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1147 struct rbd_image_snap_ondisk *snaps;
1148 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1150 /* We'll keep a copy of the snapshot names... */
1152 if (snap_names_len > (u64)SIZE_MAX)
1154 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1158 /* ...as well as the array of their sizes. */
1159 snap_sizes = kmalloc_array(snap_count,
1160 sizeof(*header->snap_sizes),
1166 * Copy the names, and fill in each snapshot's id
1169 * Note that rbd_dev_v1_header_info() guarantees the
1170 * ondisk buffer we're working with has
1171 * snap_names_len bytes beyond the end of the
1172 * snapshot id array, this memcpy() is safe.
1174 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1175 snaps = ondisk->snaps;
1176 for (i = 0; i < snap_count; i++) {
1177 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1178 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1182 /* We won't fail any more, fill in the header */
1185 header->object_prefix = object_prefix;
1186 header->obj_order = ondisk->options.order;
1187 rbd_init_layout(rbd_dev);
1189 ceph_put_snap_context(header->snapc);
1190 kfree(header->snap_names);
1191 kfree(header->snap_sizes);
1194 /* The remaining fields always get updated (when we refresh) */
1196 header->image_size = le64_to_cpu(ondisk->image_size);
1197 header->snapc = snapc;
1198 header->snap_names = snap_names;
1199 header->snap_sizes = snap_sizes;
1207 ceph_put_snap_context(snapc);
1208 kfree(object_prefix);
1213 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1215 const char *snap_name;
1217 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1219 /* Skip over names until we find the one we are looking for */
1221 snap_name = rbd_dev->header.snap_names;
1223 snap_name += strlen(snap_name) + 1;
1225 return kstrdup(snap_name, GFP_KERNEL);
1229 * Snapshot id comparison function for use with qsort()/bsearch().
1230 * Note that result is for snapshots in *descending* order.
1232 static int snapid_compare_reverse(const void *s1, const void *s2)
1234 u64 snap_id1 = *(u64 *)s1;
1235 u64 snap_id2 = *(u64 *)s2;
1237 if (snap_id1 < snap_id2)
1239 return snap_id1 == snap_id2 ? 0 : -1;
1243 * Search a snapshot context to see if the given snapshot id is
1246 * Returns the position of the snapshot id in the array if it's found,
1247 * or BAD_SNAP_INDEX otherwise.
1249 * Note: The snapshot array is in kept sorted (by the osd) in
1250 * reverse order, highest snapshot id first.
1252 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1254 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1257 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1258 sizeof (snap_id), snapid_compare_reverse);
1260 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1263 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1267 const char *snap_name;
1269 which = rbd_dev_snap_index(rbd_dev, snap_id);
1270 if (which == BAD_SNAP_INDEX)
1271 return ERR_PTR(-ENOENT);
1273 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1274 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1277 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1279 if (snap_id == CEPH_NOSNAP)
1280 return RBD_SNAP_HEAD_NAME;
1282 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1283 if (rbd_dev->image_format == 1)
1284 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1286 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1289 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1292 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1293 if (snap_id == CEPH_NOSNAP) {
1294 *snap_size = rbd_dev->header.image_size;
1295 } else if (rbd_dev->image_format == 1) {
1298 which = rbd_dev_snap_index(rbd_dev, snap_id);
1299 if (which == BAD_SNAP_INDEX)
1302 *snap_size = rbd_dev->header.snap_sizes[which];
1307 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1316 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1319 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1320 if (snap_id == CEPH_NOSNAP) {
1321 *snap_features = rbd_dev->header.features;
1322 } else if (rbd_dev->image_format == 1) {
1323 *snap_features = 0; /* No features for format 1 */
1328 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1332 *snap_features = features;
1337 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1339 u64 snap_id = rbd_dev->spec->snap_id;
1344 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1347 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1351 rbd_dev->mapping.size = size;
1352 rbd_dev->mapping.features = features;
1357 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1359 rbd_dev->mapping.size = 0;
1360 rbd_dev->mapping.features = 0;
1363 static void zero_bvec(struct bio_vec *bv)
1366 unsigned long flags;
1368 buf = bvec_kmap_irq(bv, &flags);
1369 memset(buf, 0, bv->bv_len);
1370 flush_dcache_page(bv->bv_page);
1371 bvec_kunmap_irq(buf, &flags);
1374 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1376 struct ceph_bio_iter it = *bio_pos;
1378 ceph_bio_iter_advance(&it, off);
1379 ceph_bio_iter_advance_step(&it, bytes, ({
1384 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1386 struct ceph_bvec_iter it = *bvec_pos;
1388 ceph_bvec_iter_advance(&it, off);
1389 ceph_bvec_iter_advance_step(&it, bytes, ({
1395 * Zero a range in @obj_req data buffer defined by a bio (list) or
1396 * (private) bio_vec array.
1398 * @off is relative to the start of the data buffer.
1400 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1403 dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1405 switch (obj_req->img_request->data_type) {
1406 case OBJ_REQUEST_BIO:
1407 zero_bios(&obj_req->bio_pos, off, bytes);
1409 case OBJ_REQUEST_BVECS:
1410 case OBJ_REQUEST_OWN_BVECS:
1411 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1418 static void rbd_obj_request_destroy(struct kref *kref);
1419 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1421 rbd_assert(obj_request != NULL);
1422 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1423 kref_read(&obj_request->kref));
1424 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1427 static void rbd_img_request_destroy(struct kref *kref);
1428 static void rbd_img_request_put(struct rbd_img_request *img_request)
1430 rbd_assert(img_request != NULL);
1431 dout("%s: img %p (was %d)\n", __func__, img_request,
1432 kref_read(&img_request->kref));
1433 kref_put(&img_request->kref, rbd_img_request_destroy);
1436 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1437 struct rbd_obj_request *obj_request)
1439 rbd_assert(obj_request->img_request == NULL);
1441 /* Image request now owns object's original reference */
1442 obj_request->img_request = img_request;
1443 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1446 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1447 struct rbd_obj_request *obj_request)
1449 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1450 list_del(&obj_request->ex.oe_item);
1451 rbd_assert(obj_request->img_request == img_request);
1452 rbd_obj_request_put(obj_request);
1455 static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1457 struct rbd_obj_request *obj_req = osd_req->r_priv;
1459 dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1460 __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1461 obj_req->ex.oe_off, obj_req->ex.oe_len);
1462 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1466 * The default/initial value for all image request flags is 0. Each
1467 * is conditionally set to 1 at image request initialization time
1468 * and currently never change thereafter.
1470 static void img_request_layered_set(struct rbd_img_request *img_request)
1472 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1476 static void img_request_layered_clear(struct rbd_img_request *img_request)
1478 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1482 static bool img_request_layered_test(struct rbd_img_request *img_request)
1485 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1488 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1490 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1492 return !obj_req->ex.oe_off &&
1493 obj_req->ex.oe_len == rbd_dev->layout.object_size;
1496 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1498 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1500 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1501 rbd_dev->layout.object_size;
1505 * Must be called after rbd_obj_calc_img_extents().
1507 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1509 if (!obj_req->num_img_extents ||
1510 (rbd_obj_is_entire(obj_req) &&
1511 !obj_req->img_request->snapc->num_snaps))
1517 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1519 return ceph_file_extents_bytes(obj_req->img_extents,
1520 obj_req->num_img_extents);
1523 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1525 switch (img_req->op_type) {
1529 case OBJ_OP_DISCARD:
1530 case OBJ_OP_ZEROOUT:
1537 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1539 struct rbd_obj_request *obj_req = osd_req->r_priv;
1542 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1543 osd_req->r_result, obj_req);
1546 * Writes aren't allowed to return a data payload. In some
1547 * guarded write cases (e.g. stat + zero on an empty object)
1548 * a stat response makes it through, but we don't care.
1550 if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1553 result = osd_req->r_result;
1555 rbd_obj_handle_request(obj_req, result);
1558 static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1560 struct rbd_obj_request *obj_request = osd_req->r_priv;
1562 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1563 osd_req->r_snapid = obj_request->img_request->snap_id;
1566 static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1568 struct rbd_obj_request *obj_request = osd_req->r_priv;
1570 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1571 ktime_get_real_ts64(&osd_req->r_mtime);
1572 osd_req->r_data_offset = obj_request->ex.oe_off;
1575 static struct ceph_osd_request *
1576 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1577 struct ceph_snap_context *snapc, int num_ops)
1579 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1580 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1581 struct ceph_osd_request *req;
1582 const char *name_format = rbd_dev->image_format == 1 ?
1583 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1586 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1588 return ERR_PTR(-ENOMEM);
1590 list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1591 req->r_callback = rbd_osd_req_callback;
1592 req->r_priv = obj_req;
1595 * Data objects may be stored in a separate pool, but always in
1596 * the same namespace in that pool as the header in its pool.
1598 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1599 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1601 ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1602 rbd_dev->header.object_prefix,
1603 obj_req->ex.oe_objno);
1605 return ERR_PTR(ret);
1610 static struct ceph_osd_request *
1611 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1613 return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1617 static struct rbd_obj_request *rbd_obj_request_create(void)
1619 struct rbd_obj_request *obj_request;
1621 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1625 ceph_object_extent_init(&obj_request->ex);
1626 INIT_LIST_HEAD(&obj_request->osd_reqs);
1627 mutex_init(&obj_request->state_mutex);
1628 kref_init(&obj_request->kref);
1630 dout("%s %p\n", __func__, obj_request);
1634 static void rbd_obj_request_destroy(struct kref *kref)
1636 struct rbd_obj_request *obj_request;
1637 struct ceph_osd_request *osd_req;
1640 obj_request = container_of(kref, struct rbd_obj_request, kref);
1642 dout("%s: obj %p\n", __func__, obj_request);
1644 while (!list_empty(&obj_request->osd_reqs)) {
1645 osd_req = list_first_entry(&obj_request->osd_reqs,
1646 struct ceph_osd_request, r_private_item);
1647 list_del_init(&osd_req->r_private_item);
1648 ceph_osdc_put_request(osd_req);
1651 switch (obj_request->img_request->data_type) {
1652 case OBJ_REQUEST_NODATA:
1653 case OBJ_REQUEST_BIO:
1654 case OBJ_REQUEST_BVECS:
1655 break; /* Nothing to do */
1656 case OBJ_REQUEST_OWN_BVECS:
1657 kfree(obj_request->bvec_pos.bvecs);
1663 kfree(obj_request->img_extents);
1664 if (obj_request->copyup_bvecs) {
1665 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1666 if (obj_request->copyup_bvecs[i].bv_page)
1667 __free_page(obj_request->copyup_bvecs[i].bv_page);
1669 kfree(obj_request->copyup_bvecs);
1672 kmem_cache_free(rbd_obj_request_cache, obj_request);
1675 /* It's OK to call this for a device with no parent */
1677 static void rbd_spec_put(struct rbd_spec *spec);
1678 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1680 rbd_dev_remove_parent(rbd_dev);
1681 rbd_spec_put(rbd_dev->parent_spec);
1682 rbd_dev->parent_spec = NULL;
1683 rbd_dev->parent_overlap = 0;
1687 * Parent image reference counting is used to determine when an
1688 * image's parent fields can be safely torn down--after there are no
1689 * more in-flight requests to the parent image. When the last
1690 * reference is dropped, cleaning them up is safe.
1692 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1696 if (!rbd_dev->parent_spec)
1699 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1703 /* Last reference; clean up parent data structures */
1706 rbd_dev_unparent(rbd_dev);
1708 rbd_warn(rbd_dev, "parent reference underflow");
1712 * If an image has a non-zero parent overlap, get a reference to its
1715 * Returns true if the rbd device has a parent with a non-zero
1716 * overlap and a reference for it was successfully taken, or
1719 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1723 if (!rbd_dev->parent_spec)
1726 down_read(&rbd_dev->header_rwsem);
1727 if (rbd_dev->parent_overlap)
1728 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1729 up_read(&rbd_dev->header_rwsem);
1732 rbd_warn(rbd_dev, "parent reference overflow");
1738 * Caller is responsible for filling in the list of object requests
1739 * that comprises the image request, and the Linux request pointer
1740 * (if there is one).
1742 static struct rbd_img_request *rbd_img_request_create(
1743 struct rbd_device *rbd_dev,
1744 enum obj_operation_type op_type,
1745 struct ceph_snap_context *snapc)
1747 struct rbd_img_request *img_request;
1749 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1753 img_request->rbd_dev = rbd_dev;
1754 img_request->op_type = op_type;
1755 if (!rbd_img_is_write(img_request))
1756 img_request->snap_id = rbd_dev->spec->snap_id;
1758 img_request->snapc = snapc;
1760 if (rbd_dev_parent_get(rbd_dev))
1761 img_request_layered_set(img_request);
1763 INIT_LIST_HEAD(&img_request->lock_item);
1764 INIT_LIST_HEAD(&img_request->object_extents);
1765 mutex_init(&img_request->state_mutex);
1766 kref_init(&img_request->kref);
1771 static void rbd_img_request_destroy(struct kref *kref)
1773 struct rbd_img_request *img_request;
1774 struct rbd_obj_request *obj_request;
1775 struct rbd_obj_request *next_obj_request;
1777 img_request = container_of(kref, struct rbd_img_request, kref);
1779 dout("%s: img %p\n", __func__, img_request);
1781 WARN_ON(!list_empty(&img_request->lock_item));
1782 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1783 rbd_img_obj_request_del(img_request, obj_request);
1785 if (img_request_layered_test(img_request)) {
1786 img_request_layered_clear(img_request);
1787 rbd_dev_parent_put(img_request->rbd_dev);
1790 if (rbd_img_is_write(img_request))
1791 ceph_put_snap_context(img_request->snapc);
1793 kmem_cache_free(rbd_img_request_cache, img_request);
1796 #define BITS_PER_OBJ 2
1797 #define OBJS_PER_BYTE (BITS_PER_BYTE / BITS_PER_OBJ)
1798 #define OBJ_MASK ((1 << BITS_PER_OBJ) - 1)
1800 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1801 u64 *index, u8 *shift)
1805 rbd_assert(objno < rbd_dev->object_map_size);
1806 *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1807 *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1810 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1815 lockdep_assert_held(&rbd_dev->object_map_lock);
1816 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1817 return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1820 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1826 lockdep_assert_held(&rbd_dev->object_map_lock);
1827 rbd_assert(!(val & ~OBJ_MASK));
1829 __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1830 p = &rbd_dev->object_map[index];
1831 *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1834 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1838 spin_lock(&rbd_dev->object_map_lock);
1839 state = __rbd_object_map_get(rbd_dev, objno);
1840 spin_unlock(&rbd_dev->object_map_lock);
1844 static bool use_object_map(struct rbd_device *rbd_dev)
1846 return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1847 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1850 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1854 /* fall back to default logic if object map is disabled or invalid */
1855 if (!use_object_map(rbd_dev))
1858 state = rbd_object_map_get(rbd_dev, objno);
1859 return state != OBJECT_NONEXISTENT;
1862 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1863 struct ceph_object_id *oid)
1865 if (snap_id == CEPH_NOSNAP)
1866 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1867 rbd_dev->spec->image_id);
1869 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1870 rbd_dev->spec->image_id, snap_id);
1873 static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1875 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1876 CEPH_DEFINE_OID_ONSTACK(oid);
1879 struct ceph_locker *lockers;
1881 bool broke_lock = false;
1884 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1887 ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1888 CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1889 if (ret != -EBUSY || broke_lock) {
1891 ret = 0; /* already locked by myself */
1893 rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1897 ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1898 RBD_LOCK_NAME, &lock_type, &lock_tag,
1899 &lockers, &num_lockers);
1904 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1909 if (num_lockers == 0)
1912 rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1913 ENTITY_NAME(lockers[0].id.name));
1915 ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1916 RBD_LOCK_NAME, lockers[0].id.cookie,
1917 &lockers[0].id.name);
1918 ceph_free_lockers(lockers, num_lockers);
1923 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1931 static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1933 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1934 CEPH_DEFINE_OID_ONSTACK(oid);
1937 rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1939 ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1941 if (ret && ret != -ENOENT)
1942 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1945 static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1953 ceph_decode_32_safe(p, end, header_len, e_inval);
1954 header_end = *p + header_len;
1956 ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1961 ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1970 static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1972 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1973 CEPH_DEFINE_OID_ONSTACK(oid);
1974 struct page **pages;
1978 u64 object_map_bytes;
1979 u64 object_map_size;
1983 rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1985 num_objects = ceph_get_num_objects(&rbd_dev->layout,
1986 rbd_dev->mapping.size);
1987 object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1989 num_pages = calc_pages_for(0, object_map_bytes) + 1;
1990 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1992 return PTR_ERR(pages);
1994 reply_len = num_pages * PAGE_SIZE;
1995 rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1996 ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1997 "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1998 NULL, 0, pages, &reply_len);
2002 p = page_address(pages[0]);
2003 end = p + min(reply_len, (size_t)PAGE_SIZE);
2004 ret = decode_object_map_header(&p, end, &object_map_size);
2008 if (object_map_size != num_objects) {
2009 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
2010 object_map_size, num_objects);
2015 if (offset_in_page(p) + object_map_bytes > reply_len) {
2020 rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
2021 if (!rbd_dev->object_map) {
2026 rbd_dev->object_map_size = object_map_size;
2027 ceph_copy_from_page_vector(pages, rbd_dev->object_map,
2028 offset_in_page(p), object_map_bytes);
2031 ceph_release_page_vector(pages, num_pages);
2035 static void rbd_object_map_free(struct rbd_device *rbd_dev)
2037 kvfree(rbd_dev->object_map);
2038 rbd_dev->object_map = NULL;
2039 rbd_dev->object_map_size = 0;
2042 static int rbd_object_map_load(struct rbd_device *rbd_dev)
2046 ret = __rbd_object_map_load(rbd_dev);
2050 ret = rbd_dev_v2_get_flags(rbd_dev);
2052 rbd_object_map_free(rbd_dev);
2056 if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
2057 rbd_warn(rbd_dev, "object map is invalid");
2062 static int rbd_object_map_open(struct rbd_device *rbd_dev)
2066 ret = rbd_object_map_lock(rbd_dev);
2070 ret = rbd_object_map_load(rbd_dev);
2072 rbd_object_map_unlock(rbd_dev);
2079 static void rbd_object_map_close(struct rbd_device *rbd_dev)
2081 rbd_object_map_free(rbd_dev);
2082 rbd_object_map_unlock(rbd_dev);
2086 * This function needs snap_id (or more precisely just something to
2087 * distinguish between HEAD and snapshot object maps), new_state and
2088 * current_state that were passed to rbd_object_map_update().
2090 * To avoid allocating and stashing a context we piggyback on the OSD
2091 * request. A HEAD update has two ops (assert_locked). For new_state
2092 * and current_state we decode our own object_map_update op, encoded in
2093 * rbd_cls_object_map_update().
2095 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2096 struct ceph_osd_request *osd_req)
2098 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2099 struct ceph_osd_data *osd_data;
2101 u8 state, new_state, uninitialized_var(current_state);
2102 bool has_current_state;
2105 if (osd_req->r_result)
2106 return osd_req->r_result;
2109 * Nothing to do for a snapshot object map.
2111 if (osd_req->r_num_ops == 1)
2115 * Update in-memory HEAD object map.
2117 rbd_assert(osd_req->r_num_ops == 2);
2118 osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2119 rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2121 p = page_address(osd_data->pages[0]);
2122 objno = ceph_decode_64(&p);
2123 rbd_assert(objno == obj_req->ex.oe_objno);
2124 rbd_assert(ceph_decode_64(&p) == objno + 1);
2125 new_state = ceph_decode_8(&p);
2126 has_current_state = ceph_decode_8(&p);
2127 if (has_current_state)
2128 current_state = ceph_decode_8(&p);
2130 spin_lock(&rbd_dev->object_map_lock);
2131 state = __rbd_object_map_get(rbd_dev, objno);
2132 if (!has_current_state || current_state == state ||
2133 (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2134 __rbd_object_map_set(rbd_dev, objno, new_state);
2135 spin_unlock(&rbd_dev->object_map_lock);
2140 static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2142 struct rbd_obj_request *obj_req = osd_req->r_priv;
2145 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2146 osd_req->r_result, obj_req);
2148 result = rbd_object_map_update_finish(obj_req, osd_req);
2149 rbd_obj_handle_request(obj_req, result);
2152 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2154 u8 state = rbd_object_map_get(rbd_dev, objno);
2156 if (state == new_state ||
2157 (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2158 (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2164 static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2165 int which, u64 objno, u8 new_state,
2166 const u8 *current_state)
2168 struct page **pages;
2172 ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2176 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2178 return PTR_ERR(pages);
2180 p = start = page_address(pages[0]);
2181 ceph_encode_64(&p, objno);
2182 ceph_encode_64(&p, objno + 1);
2183 ceph_encode_8(&p, new_state);
2184 if (current_state) {
2185 ceph_encode_8(&p, 1);
2186 ceph_encode_8(&p, *current_state);
2188 ceph_encode_8(&p, 0);
2191 osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2198 * 0 - object map update sent
2199 * 1 - object map update isn't needed
2202 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2203 u8 new_state, const u8 *current_state)
2205 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2206 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2207 struct ceph_osd_request *req;
2212 if (snap_id == CEPH_NOSNAP) {
2213 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2216 num_ops++; /* assert_locked */
2219 req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2223 list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2224 req->r_callback = rbd_object_map_callback;
2225 req->r_priv = obj_req;
2227 rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2228 ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2229 req->r_flags = CEPH_OSD_FLAG_WRITE;
2230 ktime_get_real_ts64(&req->r_mtime);
2232 if (snap_id == CEPH_NOSNAP) {
2234 * Protect against possible race conditions during lock
2235 * ownership transitions.
2237 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2238 CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2243 ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2244 new_state, current_state);
2248 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2252 ceph_osdc_start_request(osdc, req, false);
2256 static void prune_extents(struct ceph_file_extent *img_extents,
2257 u32 *num_img_extents, u64 overlap)
2259 u32 cnt = *num_img_extents;
2261 /* drop extents completely beyond the overlap */
2262 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2266 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2268 /* trim final overlapping extent */
2269 if (ex->fe_off + ex->fe_len > overlap)
2270 ex->fe_len = overlap - ex->fe_off;
2273 *num_img_extents = cnt;
2277 * Determine the byte range(s) covered by either just the object extent
2278 * or the entire object in the parent image.
2280 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2283 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2286 if (!rbd_dev->parent_overlap)
2289 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2290 entire ? 0 : obj_req->ex.oe_off,
2291 entire ? rbd_dev->layout.object_size :
2293 &obj_req->img_extents,
2294 &obj_req->num_img_extents);
2298 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2299 rbd_dev->parent_overlap);
2303 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2305 struct rbd_obj_request *obj_req = osd_req->r_priv;
2307 switch (obj_req->img_request->data_type) {
2308 case OBJ_REQUEST_BIO:
2309 osd_req_op_extent_osd_data_bio(osd_req, which,
2311 obj_req->ex.oe_len);
2313 case OBJ_REQUEST_BVECS:
2314 case OBJ_REQUEST_OWN_BVECS:
2315 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2316 obj_req->ex.oe_len);
2317 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2318 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2319 &obj_req->bvec_pos);
2326 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2328 struct page **pages;
2331 * The response data for a STAT call consists of:
2338 pages = ceph_alloc_page_vector(1, GFP_NOIO);
2340 return PTR_ERR(pages);
2342 osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2343 osd_req_op_raw_data_in_pages(osd_req, which, pages,
2344 8 + sizeof(struct ceph_timespec),
2349 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2352 struct rbd_obj_request *obj_req = osd_req->r_priv;
2355 ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2359 osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2360 obj_req->copyup_bvec_count, bytes);
2364 static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2366 obj_req->read_state = RBD_OBJ_READ_START;
2370 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2373 struct rbd_obj_request *obj_req = osd_req->r_priv;
2374 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2377 if (!use_object_map(rbd_dev) ||
2378 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2379 osd_req_op_alloc_hint_init(osd_req, which++,
2380 rbd_dev->layout.object_size,
2381 rbd_dev->layout.object_size);
2384 if (rbd_obj_is_entire(obj_req))
2385 opcode = CEPH_OSD_OP_WRITEFULL;
2387 opcode = CEPH_OSD_OP_WRITE;
2389 osd_req_op_extent_init(osd_req, which, opcode,
2390 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2391 rbd_osd_setup_data(osd_req, which);
2394 static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2398 /* reverse map the entire object onto the parent */
2399 ret = rbd_obj_calc_img_extents(obj_req, true);
2403 if (rbd_obj_copyup_enabled(obj_req))
2404 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2406 obj_req->write_state = RBD_OBJ_WRITE_START;
2410 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2412 return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2416 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2419 struct rbd_obj_request *obj_req = osd_req->r_priv;
2421 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2422 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2423 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2425 osd_req_op_extent_init(osd_req, which,
2426 truncate_or_zero_opcode(obj_req),
2427 obj_req->ex.oe_off, obj_req->ex.oe_len,
2432 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2434 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2439 * Align the range to alloc_size boundary and punt on discards
2440 * that are too small to free up any space.
2442 * alloc_size == object_size && is_tail() is a special case for
2443 * filestore with filestore_punch_hole = false, needed to allow
2444 * truncate (in addition to delete).
2446 if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2447 !rbd_obj_is_tail(obj_req)) {
2448 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2449 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2450 rbd_dev->opts->alloc_size);
2451 if (off >= next_off)
2454 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2455 obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2456 off, next_off - off);
2457 obj_req->ex.oe_off = off;
2458 obj_req->ex.oe_len = next_off - off;
2461 /* reverse map the entire object onto the parent */
2462 ret = rbd_obj_calc_img_extents(obj_req, true);
2466 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2467 if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2468 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2470 obj_req->write_state = RBD_OBJ_WRITE_START;
2474 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2477 struct rbd_obj_request *obj_req = osd_req->r_priv;
2480 if (rbd_obj_is_entire(obj_req)) {
2481 if (obj_req->num_img_extents) {
2482 if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2483 osd_req_op_init(osd_req, which++,
2484 CEPH_OSD_OP_CREATE, 0);
2485 opcode = CEPH_OSD_OP_TRUNCATE;
2487 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2488 osd_req_op_init(osd_req, which++,
2489 CEPH_OSD_OP_DELETE, 0);
2493 opcode = truncate_or_zero_opcode(obj_req);
2497 osd_req_op_extent_init(osd_req, which, opcode,
2498 obj_req->ex.oe_off, obj_req->ex.oe_len,
2502 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2506 /* reverse map the entire object onto the parent */
2507 ret = rbd_obj_calc_img_extents(obj_req, true);
2511 if (rbd_obj_copyup_enabled(obj_req))
2512 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2513 if (!obj_req->num_img_extents) {
2514 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2515 if (rbd_obj_is_entire(obj_req))
2516 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2519 obj_req->write_state = RBD_OBJ_WRITE_START;
2523 static int count_write_ops(struct rbd_obj_request *obj_req)
2525 struct rbd_img_request *img_req = obj_req->img_request;
2527 switch (img_req->op_type) {
2529 if (!use_object_map(img_req->rbd_dev) ||
2530 !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2531 return 2; /* setallochint + write/writefull */
2533 return 1; /* write/writefull */
2534 case OBJ_OP_DISCARD:
2535 return 1; /* delete/truncate/zero */
2536 case OBJ_OP_ZEROOUT:
2537 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2538 !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2539 return 2; /* create + truncate */
2541 return 1; /* delete/truncate/zero */
2547 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2550 struct rbd_obj_request *obj_req = osd_req->r_priv;
2552 switch (obj_req->img_request->op_type) {
2554 __rbd_osd_setup_write_ops(osd_req, which);
2556 case OBJ_OP_DISCARD:
2557 __rbd_osd_setup_discard_ops(osd_req, which);
2559 case OBJ_OP_ZEROOUT:
2560 __rbd_osd_setup_zeroout_ops(osd_req, which);
2568 * Prune the list of object requests (adjust offset and/or length, drop
2569 * redundant requests). Prepare object request state machines and image
2570 * request state machine for execution.
2572 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2574 struct rbd_obj_request *obj_req, *next_obj_req;
2577 for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2578 switch (img_req->op_type) {
2580 ret = rbd_obj_init_read(obj_req);
2583 ret = rbd_obj_init_write(obj_req);
2585 case OBJ_OP_DISCARD:
2586 ret = rbd_obj_init_discard(obj_req);
2588 case OBJ_OP_ZEROOUT:
2589 ret = rbd_obj_init_zeroout(obj_req);
2597 rbd_img_obj_request_del(img_req, obj_req);
2602 img_req->state = RBD_IMG_START;
2606 union rbd_img_fill_iter {
2607 struct ceph_bio_iter bio_iter;
2608 struct ceph_bvec_iter bvec_iter;
2611 struct rbd_img_fill_ctx {
2612 enum obj_request_type pos_type;
2613 union rbd_img_fill_iter *pos;
2614 union rbd_img_fill_iter iter;
2615 ceph_object_extent_fn_t set_pos_fn;
2616 ceph_object_extent_fn_t count_fn;
2617 ceph_object_extent_fn_t copy_fn;
2620 static struct ceph_object_extent *alloc_object_extent(void *arg)
2622 struct rbd_img_request *img_req = arg;
2623 struct rbd_obj_request *obj_req;
2625 obj_req = rbd_obj_request_create();
2629 rbd_img_obj_request_add(img_req, obj_req);
2630 return &obj_req->ex;
2634 * While su != os && sc == 1 is technically not fancy (it's the same
2635 * layout as su == os && sc == 1), we can't use the nocopy path for it
2636 * because ->set_pos_fn() should be called only once per object.
2637 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2638 * treat su != os && sc == 1 as fancy.
2640 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2642 return l->stripe_unit != l->object_size;
2645 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2646 struct ceph_file_extent *img_extents,
2647 u32 num_img_extents,
2648 struct rbd_img_fill_ctx *fctx)
2653 img_req->data_type = fctx->pos_type;
2656 * Create object requests and set each object request's starting
2657 * position in the provided bio (list) or bio_vec array.
2659 fctx->iter = *fctx->pos;
2660 for (i = 0; i < num_img_extents; i++) {
2661 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2662 img_extents[i].fe_off,
2663 img_extents[i].fe_len,
2664 &img_req->object_extents,
2665 alloc_object_extent, img_req,
2666 fctx->set_pos_fn, &fctx->iter);
2671 return __rbd_img_fill_request(img_req);
2675 * Map a list of image extents to a list of object extents, create the
2676 * corresponding object requests (normally each to a different object,
2677 * but not always) and add them to @img_req. For each object request,
2678 * set up its data descriptor to point to the corresponding chunk(s) of
2679 * @fctx->pos data buffer.
2681 * Because ceph_file_to_extents() will merge adjacent object extents
2682 * together, each object request's data descriptor may point to multiple
2683 * different chunks of @fctx->pos data buffer.
2685 * @fctx->pos data buffer is assumed to be large enough.
2687 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2688 struct ceph_file_extent *img_extents,
2689 u32 num_img_extents,
2690 struct rbd_img_fill_ctx *fctx)
2692 struct rbd_device *rbd_dev = img_req->rbd_dev;
2693 struct rbd_obj_request *obj_req;
2697 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2698 !rbd_layout_is_fancy(&rbd_dev->layout))
2699 return rbd_img_fill_request_nocopy(img_req, img_extents,
2700 num_img_extents, fctx);
2702 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2705 * Create object requests and determine ->bvec_count for each object
2706 * request. Note that ->bvec_count sum over all object requests may
2707 * be greater than the number of bio_vecs in the provided bio (list)
2708 * or bio_vec array because when mapped, those bio_vecs can straddle
2709 * stripe unit boundaries.
2711 fctx->iter = *fctx->pos;
2712 for (i = 0; i < num_img_extents; i++) {
2713 ret = ceph_file_to_extents(&rbd_dev->layout,
2714 img_extents[i].fe_off,
2715 img_extents[i].fe_len,
2716 &img_req->object_extents,
2717 alloc_object_extent, img_req,
2718 fctx->count_fn, &fctx->iter);
2723 for_each_obj_request(img_req, obj_req) {
2724 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2725 sizeof(*obj_req->bvec_pos.bvecs),
2727 if (!obj_req->bvec_pos.bvecs)
2732 * Fill in each object request's private bio_vec array, splitting and
2733 * rearranging the provided bio_vecs in stripe unit chunks as needed.
2735 fctx->iter = *fctx->pos;
2736 for (i = 0; i < num_img_extents; i++) {
2737 ret = ceph_iterate_extents(&rbd_dev->layout,
2738 img_extents[i].fe_off,
2739 img_extents[i].fe_len,
2740 &img_req->object_extents,
2741 fctx->copy_fn, &fctx->iter);
2746 return __rbd_img_fill_request(img_req);
2749 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2752 struct ceph_file_extent ex = { off, len };
2753 union rbd_img_fill_iter dummy;
2754 struct rbd_img_fill_ctx fctx = {
2755 .pos_type = OBJ_REQUEST_NODATA,
2759 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2762 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2764 struct rbd_obj_request *obj_req =
2765 container_of(ex, struct rbd_obj_request, ex);
2766 struct ceph_bio_iter *it = arg;
2768 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2769 obj_req->bio_pos = *it;
2770 ceph_bio_iter_advance(it, bytes);
2773 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2775 struct rbd_obj_request *obj_req =
2776 container_of(ex, struct rbd_obj_request, ex);
2777 struct ceph_bio_iter *it = arg;
2779 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2780 ceph_bio_iter_advance_step(it, bytes, ({
2781 obj_req->bvec_count++;
2786 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2788 struct rbd_obj_request *obj_req =
2789 container_of(ex, struct rbd_obj_request, ex);
2790 struct ceph_bio_iter *it = arg;
2792 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2793 ceph_bio_iter_advance_step(it, bytes, ({
2794 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2795 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2799 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2800 struct ceph_file_extent *img_extents,
2801 u32 num_img_extents,
2802 struct ceph_bio_iter *bio_pos)
2804 struct rbd_img_fill_ctx fctx = {
2805 .pos_type = OBJ_REQUEST_BIO,
2806 .pos = (union rbd_img_fill_iter *)bio_pos,
2807 .set_pos_fn = set_bio_pos,
2808 .count_fn = count_bio_bvecs,
2809 .copy_fn = copy_bio_bvecs,
2812 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2816 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2817 u64 off, u64 len, struct bio *bio)
2819 struct ceph_file_extent ex = { off, len };
2820 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2822 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2825 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2827 struct rbd_obj_request *obj_req =
2828 container_of(ex, struct rbd_obj_request, ex);
2829 struct ceph_bvec_iter *it = arg;
2831 obj_req->bvec_pos = *it;
2832 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2833 ceph_bvec_iter_advance(it, bytes);
2836 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2838 struct rbd_obj_request *obj_req =
2839 container_of(ex, struct rbd_obj_request, ex);
2840 struct ceph_bvec_iter *it = arg;
2842 ceph_bvec_iter_advance_step(it, bytes, ({
2843 obj_req->bvec_count++;
2847 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2849 struct rbd_obj_request *obj_req =
2850 container_of(ex, struct rbd_obj_request, ex);
2851 struct ceph_bvec_iter *it = arg;
2853 ceph_bvec_iter_advance_step(it, bytes, ({
2854 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2855 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2859 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2860 struct ceph_file_extent *img_extents,
2861 u32 num_img_extents,
2862 struct ceph_bvec_iter *bvec_pos)
2864 struct rbd_img_fill_ctx fctx = {
2865 .pos_type = OBJ_REQUEST_BVECS,
2866 .pos = (union rbd_img_fill_iter *)bvec_pos,
2867 .set_pos_fn = set_bvec_pos,
2868 .count_fn = count_bvecs,
2869 .copy_fn = copy_bvecs,
2872 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2876 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2877 struct ceph_file_extent *img_extents,
2878 u32 num_img_extents,
2879 struct bio_vec *bvecs)
2881 struct ceph_bvec_iter it = {
2883 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2887 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2891 static void rbd_img_handle_request_work(struct work_struct *work)
2893 struct rbd_img_request *img_req =
2894 container_of(work, struct rbd_img_request, work);
2896 rbd_img_handle_request(img_req, img_req->work_result);
2899 static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2901 INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2902 img_req->work_result = result;
2903 queue_work(rbd_wq, &img_req->work);
2906 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2908 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2910 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2911 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2915 dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2916 obj_req->ex.oe_objno);
2920 static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2922 struct ceph_osd_request *osd_req;
2925 osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2926 if (IS_ERR(osd_req))
2927 return PTR_ERR(osd_req);
2929 osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2930 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2931 rbd_osd_setup_data(osd_req, 0);
2932 rbd_osd_format_read(osd_req);
2934 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2938 rbd_osd_submit(osd_req);
2942 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2944 struct rbd_img_request *img_req = obj_req->img_request;
2945 struct rbd_img_request *child_img_req;
2948 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2953 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2954 child_img_req->obj_request = obj_req;
2956 dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2959 if (!rbd_img_is_write(img_req)) {
2960 switch (img_req->data_type) {
2961 case OBJ_REQUEST_BIO:
2962 ret = __rbd_img_fill_from_bio(child_img_req,
2963 obj_req->img_extents,
2964 obj_req->num_img_extents,
2967 case OBJ_REQUEST_BVECS:
2968 case OBJ_REQUEST_OWN_BVECS:
2969 ret = __rbd_img_fill_from_bvecs(child_img_req,
2970 obj_req->img_extents,
2971 obj_req->num_img_extents,
2972 &obj_req->bvec_pos);
2978 ret = rbd_img_fill_from_bvecs(child_img_req,
2979 obj_req->img_extents,
2980 obj_req->num_img_extents,
2981 obj_req->copyup_bvecs);
2984 rbd_img_request_put(child_img_req);
2988 /* avoid parent chain recursion */
2989 rbd_img_schedule(child_img_req, 0);
2993 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2995 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2999 switch (obj_req->read_state) {
3000 case RBD_OBJ_READ_START:
3001 rbd_assert(!*result);
3003 if (!rbd_obj_may_exist(obj_req)) {
3005 obj_req->read_state = RBD_OBJ_READ_OBJECT;
3009 ret = rbd_obj_read_object(obj_req);
3014 obj_req->read_state = RBD_OBJ_READ_OBJECT;
3016 case RBD_OBJ_READ_OBJECT:
3017 if (*result == -ENOENT && rbd_dev->parent_overlap) {
3018 /* reverse map this object extent onto the parent */
3019 ret = rbd_obj_calc_img_extents(obj_req, false);
3024 if (obj_req->num_img_extents) {
3025 ret = rbd_obj_read_from_parent(obj_req);
3030 obj_req->read_state = RBD_OBJ_READ_PARENT;
3036 * -ENOENT means a hole in the image -- zero-fill the entire
3037 * length of the request. A short read also implies zero-fill
3038 * to the end of the request.
3040 if (*result == -ENOENT) {
3041 rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
3043 } else if (*result >= 0) {
3044 if (*result < obj_req->ex.oe_len)
3045 rbd_obj_zero_range(obj_req, *result,
3046 obj_req->ex.oe_len - *result);
3048 rbd_assert(*result == obj_req->ex.oe_len);
3052 case RBD_OBJ_READ_PARENT:
3054 * The parent image is read only up to the overlap -- zero-fill
3055 * from the overlap to the end of the request.
3058 u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
3060 if (obj_overlap < obj_req->ex.oe_len)
3061 rbd_obj_zero_range(obj_req, obj_overlap,
3062 obj_req->ex.oe_len - obj_overlap);
3070 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
3072 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3074 if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
3075 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
3077 if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
3078 (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
3079 dout("%s %p noop for nonexistent\n", __func__, obj_req);
3088 * 0 - object map update sent
3089 * 1 - object map update isn't needed
3092 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3094 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3097 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3100 if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3101 new_state = OBJECT_PENDING;
3103 new_state = OBJECT_EXISTS;
3105 return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3108 static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3110 struct ceph_osd_request *osd_req;
3111 int num_ops = count_write_ops(obj_req);
3115 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3116 num_ops++; /* stat */
3118 osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3119 if (IS_ERR(osd_req))
3120 return PTR_ERR(osd_req);
3122 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3123 ret = rbd_osd_setup_stat(osd_req, which++);
3128 rbd_osd_setup_write_ops(osd_req, which);
3129 rbd_osd_format_write(osd_req);
3131 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3135 rbd_osd_submit(osd_req);
3140 * copyup_bvecs pages are never highmem pages
3142 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3144 struct ceph_bvec_iter it = {
3146 .iter = { .bi_size = bytes },
3149 ceph_bvec_iter_advance_step(&it, bytes, ({
3150 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3157 #define MODS_ONLY U32_MAX
3159 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3162 struct ceph_osd_request *osd_req;
3165 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3166 rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3168 osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3169 if (IS_ERR(osd_req))
3170 return PTR_ERR(osd_req);
3172 ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3176 rbd_osd_format_write(osd_req);
3178 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3182 rbd_osd_submit(osd_req);
3186 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3189 struct ceph_osd_request *osd_req;
3190 int num_ops = count_write_ops(obj_req);
3194 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3196 if (bytes != MODS_ONLY)
3197 num_ops++; /* copyup */
3199 osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3200 if (IS_ERR(osd_req))
3201 return PTR_ERR(osd_req);
3203 if (bytes != MODS_ONLY) {
3204 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3209 rbd_osd_setup_write_ops(osd_req, which);
3210 rbd_osd_format_write(osd_req);
3212 ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3216 rbd_osd_submit(osd_req);
3220 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3224 rbd_assert(!obj_req->copyup_bvecs);
3225 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3226 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3227 sizeof(*obj_req->copyup_bvecs),
3229 if (!obj_req->copyup_bvecs)
3232 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3233 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3235 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3236 if (!obj_req->copyup_bvecs[i].bv_page)
3239 obj_req->copyup_bvecs[i].bv_offset = 0;
3240 obj_req->copyup_bvecs[i].bv_len = len;
3244 rbd_assert(!obj_overlap);
3249 * The target object doesn't exist. Read the data for the entire
3250 * target object up to the overlap point (if any) from the parent,
3251 * so we can use it for a copyup.
3253 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3255 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3258 rbd_assert(obj_req->num_img_extents);
3259 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3260 rbd_dev->parent_overlap);
3261 if (!obj_req->num_img_extents) {
3263 * The overlap has become 0 (most likely because the
3264 * image has been flattened). Re-submit the original write
3265 * request -- pass MODS_ONLY since the copyup isn't needed
3268 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3271 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3275 return rbd_obj_read_from_parent(obj_req);
3278 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3280 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3281 struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3286 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3288 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3291 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3294 for (i = 0; i < snapc->num_snaps; i++) {
3295 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3296 i + 1 < snapc->num_snaps)
3297 new_state = OBJECT_EXISTS_CLEAN;
3299 new_state = OBJECT_EXISTS;
3301 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3304 obj_req->pending.result = ret;
3309 obj_req->pending.num_pending++;
3313 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3315 u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3318 rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3321 * Only send non-zero copyup data to save some I/O and network
3322 * bandwidth -- zero copyup data is equivalent to the object not
3325 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3328 if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3330 * Send a copyup request with an empty snapshot context to
3331 * deep-copyup the object through all existing snapshots.
3332 * A second request with the current snapshot context will be
3333 * sent for the actual modification.
3335 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3337 obj_req->pending.result = ret;
3341 obj_req->pending.num_pending++;
3345 ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3347 obj_req->pending.result = ret;
3351 obj_req->pending.num_pending++;
3354 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3356 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3360 switch (obj_req->copyup_state) {
3361 case RBD_OBJ_COPYUP_START:
3362 rbd_assert(!*result);
3364 ret = rbd_obj_copyup_read_parent(obj_req);
3369 if (obj_req->num_img_extents)
3370 obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3372 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3374 case RBD_OBJ_COPYUP_READ_PARENT:
3378 if (is_zero_bvecs(obj_req->copyup_bvecs,
3379 rbd_obj_img_extents_bytes(obj_req))) {
3380 dout("%s %p detected zeros\n", __func__, obj_req);
3381 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3384 rbd_obj_copyup_object_maps(obj_req);
3385 if (!obj_req->pending.num_pending) {
3386 *result = obj_req->pending.result;
3387 obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3390 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3392 case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3393 if (!pending_result_dec(&obj_req->pending, result))
3396 case RBD_OBJ_COPYUP_OBJECT_MAPS:
3398 rbd_warn(rbd_dev, "snap object map update failed: %d",
3403 rbd_obj_copyup_write_object(obj_req);
3404 if (!obj_req->pending.num_pending) {
3405 *result = obj_req->pending.result;
3406 obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3409 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3411 case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3412 if (!pending_result_dec(&obj_req->pending, result))
3415 case RBD_OBJ_COPYUP_WRITE_OBJECT:
3424 * 0 - object map update sent
3425 * 1 - object map update isn't needed
3428 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3430 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3431 u8 current_state = OBJECT_PENDING;
3433 if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3436 if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3439 return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3443 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3445 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3449 switch (obj_req->write_state) {
3450 case RBD_OBJ_WRITE_START:
3451 rbd_assert(!*result);
3453 if (rbd_obj_write_is_noop(obj_req))
3456 ret = rbd_obj_write_pre_object_map(obj_req);
3461 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3465 case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3467 rbd_warn(rbd_dev, "pre object map update failed: %d",
3471 ret = rbd_obj_write_object(obj_req);
3476 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3478 case RBD_OBJ_WRITE_OBJECT:
3479 if (*result == -ENOENT) {
3480 if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3482 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3483 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3487 * On a non-existent object:
3488 * delete - -ENOENT, truncate/zero - 0
3490 if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3496 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3498 case __RBD_OBJ_WRITE_COPYUP:
3499 if (!rbd_obj_advance_copyup(obj_req, result))
3502 case RBD_OBJ_WRITE_COPYUP:
3504 rbd_warn(rbd_dev, "copyup failed: %d", *result);
3507 ret = rbd_obj_write_post_object_map(obj_req);
3512 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3516 case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3518 rbd_warn(rbd_dev, "post object map update failed: %d",
3527 * Return true if @obj_req is completed.
3529 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3532 struct rbd_img_request *img_req = obj_req->img_request;
3533 struct rbd_device *rbd_dev = img_req->rbd_dev;
3536 mutex_lock(&obj_req->state_mutex);
3537 if (!rbd_img_is_write(img_req))
3538 done = rbd_obj_advance_read(obj_req, result);
3540 done = rbd_obj_advance_write(obj_req, result);
3541 mutex_unlock(&obj_req->state_mutex);
3543 if (done && *result) {
3544 rbd_assert(*result < 0);
3545 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3546 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3547 obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3553 * This is open-coded in rbd_img_handle_request() to avoid parent chain
3556 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3558 if (__rbd_obj_handle_request(obj_req, &result))
3559 rbd_img_handle_request(obj_req->img_request, result);
3562 static bool need_exclusive_lock(struct rbd_img_request *img_req)
3564 struct rbd_device *rbd_dev = img_req->rbd_dev;
3566 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3569 if (rbd_is_snap(rbd_dev))
3572 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3573 if (rbd_dev->opts->lock_on_read ||
3574 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3577 return rbd_img_is_write(img_req);
3580 static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3582 struct rbd_device *rbd_dev = img_req->rbd_dev;
3585 lockdep_assert_held(&rbd_dev->lock_rwsem);
3586 locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3587 spin_lock(&rbd_dev->lock_lists_lock);
3588 rbd_assert(list_empty(&img_req->lock_item));
3590 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3592 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3593 spin_unlock(&rbd_dev->lock_lists_lock);
3597 static void rbd_lock_del_request(struct rbd_img_request *img_req)
3599 struct rbd_device *rbd_dev = img_req->rbd_dev;
3602 lockdep_assert_held(&rbd_dev->lock_rwsem);
3603 spin_lock(&rbd_dev->lock_lists_lock);
3604 rbd_assert(!list_empty(&img_req->lock_item));
3605 list_del_init(&img_req->lock_item);
3606 need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3607 list_empty(&rbd_dev->running_list));
3608 spin_unlock(&rbd_dev->lock_lists_lock);
3610 complete(&rbd_dev->releasing_wait);
3613 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3615 struct rbd_device *rbd_dev = img_req->rbd_dev;
3617 if (!need_exclusive_lock(img_req))
3620 if (rbd_lock_add_request(img_req))
3623 if (rbd_dev->opts->exclusive) {
3624 WARN_ON(1); /* lock got released? */
3629 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3630 * and cancel_delayed_work() in wake_lock_waiters().
3632 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3633 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3637 static void rbd_img_object_requests(struct rbd_img_request *img_req)
3639 struct rbd_obj_request *obj_req;
3641 rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3643 for_each_obj_request(img_req, obj_req) {
3646 if (__rbd_obj_handle_request(obj_req, &result)) {
3648 img_req->pending.result = result;
3652 img_req->pending.num_pending++;
3657 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3659 struct rbd_device *rbd_dev = img_req->rbd_dev;
3663 switch (img_req->state) {
3665 rbd_assert(!*result);
3667 ret = rbd_img_exclusive_lock(img_req);
3672 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3676 case RBD_IMG_EXCLUSIVE_LOCK:
3680 rbd_assert(!need_exclusive_lock(img_req) ||
3681 __rbd_is_lock_owner(rbd_dev));
3683 rbd_img_object_requests(img_req);
3684 if (!img_req->pending.num_pending) {
3685 *result = img_req->pending.result;
3686 img_req->state = RBD_IMG_OBJECT_REQUESTS;
3689 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3691 case __RBD_IMG_OBJECT_REQUESTS:
3692 if (!pending_result_dec(&img_req->pending, result))
3695 case RBD_IMG_OBJECT_REQUESTS:
3703 * Return true if @img_req is completed.
3705 static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3708 struct rbd_device *rbd_dev = img_req->rbd_dev;
3711 if (need_exclusive_lock(img_req)) {
3712 down_read(&rbd_dev->lock_rwsem);
3713 mutex_lock(&img_req->state_mutex);
3714 done = rbd_img_advance(img_req, result);
3716 rbd_lock_del_request(img_req);
3717 mutex_unlock(&img_req->state_mutex);
3718 up_read(&rbd_dev->lock_rwsem);
3720 mutex_lock(&img_req->state_mutex);
3721 done = rbd_img_advance(img_req, result);
3722 mutex_unlock(&img_req->state_mutex);
3725 if (done && *result) {
3726 rbd_assert(*result < 0);
3727 rbd_warn(rbd_dev, "%s%s result %d",
3728 test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3729 obj_op_name(img_req->op_type), *result);
3734 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3737 if (!__rbd_img_handle_request(img_req, &result))
3740 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3741 struct rbd_obj_request *obj_req = img_req->obj_request;
3743 rbd_img_request_put(img_req);
3744 if (__rbd_obj_handle_request(obj_req, &result)) {
3745 img_req = obj_req->img_request;
3749 struct request *rq = img_req->rq;
3751 rbd_img_request_put(img_req);
3752 blk_mq_end_request(rq, errno_to_blk_status(result));
3756 static const struct rbd_client_id rbd_empty_cid;
3758 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3759 const struct rbd_client_id *rhs)
3761 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3764 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3766 struct rbd_client_id cid;
3768 mutex_lock(&rbd_dev->watch_mutex);
3769 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3770 cid.handle = rbd_dev->watch_cookie;
3771 mutex_unlock(&rbd_dev->watch_mutex);
3776 * lock_rwsem must be held for write
3778 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3779 const struct rbd_client_id *cid)
3781 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3782 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3783 cid->gid, cid->handle);
3784 rbd_dev->owner_cid = *cid; /* struct */
3787 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3789 mutex_lock(&rbd_dev->watch_mutex);
3790 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3791 mutex_unlock(&rbd_dev->watch_mutex);
3794 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3796 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3798 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3799 strcpy(rbd_dev->lock_cookie, cookie);
3800 rbd_set_owner_cid(rbd_dev, &cid);
3801 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3805 * lock_rwsem must be held for write
3807 static int rbd_lock(struct rbd_device *rbd_dev)
3809 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3813 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3814 rbd_dev->lock_cookie[0] != '\0');
3816 format_lock_cookie(rbd_dev, cookie);
3817 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3818 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3819 RBD_LOCK_TAG, "", 0);
3823 __rbd_lock(rbd_dev, cookie);
3828 * lock_rwsem must be held for write
3830 static void rbd_unlock(struct rbd_device *rbd_dev)
3832 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3835 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3836 rbd_dev->lock_cookie[0] == '\0');
3838 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3839 RBD_LOCK_NAME, rbd_dev->lock_cookie);
3840 if (ret && ret != -ENOENT)
3841 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3843 /* treat errors as the image is unlocked */
3844 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3845 rbd_dev->lock_cookie[0] = '\0';
3846 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3847 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3850 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3851 enum rbd_notify_op notify_op,
3852 struct page ***preply_pages,
3855 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3856 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3857 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3858 int buf_size = sizeof(buf);
3861 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3863 /* encode *LockPayload NotifyMessage (op + ClientId) */
3864 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3865 ceph_encode_32(&p, notify_op);
3866 ceph_encode_64(&p, cid.gid);
3867 ceph_encode_64(&p, cid.handle);
3869 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3870 &rbd_dev->header_oloc, buf, buf_size,
3871 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3874 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3875 enum rbd_notify_op notify_op)
3877 struct page **reply_pages;
3880 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3881 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3884 static void rbd_notify_acquired_lock(struct work_struct *work)
3886 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3887 acquired_lock_work);
3889 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3892 static void rbd_notify_released_lock(struct work_struct *work)
3894 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3895 released_lock_work);
3897 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3900 static int rbd_request_lock(struct rbd_device *rbd_dev)
3902 struct page **reply_pages;
3904 bool lock_owner_responded = false;
3907 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3909 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3910 &reply_pages, &reply_len);
3911 if (ret && ret != -ETIMEDOUT) {
3912 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3916 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3917 void *p = page_address(reply_pages[0]);
3918 void *const end = p + reply_len;
3921 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3926 ceph_decode_need(&p, end, 8 + 8, e_inval);
3927 p += 8 + 8; /* skip gid and cookie */
3929 ceph_decode_32_safe(&p, end, len, e_inval);
3933 if (lock_owner_responded) {
3935 "duplicate lock owners detected");
3940 lock_owner_responded = true;
3941 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3945 "failed to decode ResponseMessage: %d",
3950 ret = ceph_decode_32(&p);
3954 if (!lock_owner_responded) {
3955 rbd_warn(rbd_dev, "no lock owners detected");
3960 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3969 * Either image request state machine(s) or rbd_add_acquire_lock()
3972 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3974 struct rbd_img_request *img_req;
3976 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3977 lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3979 cancel_delayed_work(&rbd_dev->lock_dwork);
3980 if (!completion_done(&rbd_dev->acquire_wait)) {
3981 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3982 list_empty(&rbd_dev->running_list));
3983 rbd_dev->acquire_err = result;
3984 complete_all(&rbd_dev->acquire_wait);
3988 list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3989 mutex_lock(&img_req->state_mutex);
3990 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3991 rbd_img_schedule(img_req, result);
3992 mutex_unlock(&img_req->state_mutex);
3995 list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3998 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3999 struct ceph_locker **lockers, u32 *num_lockers)
4001 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4006 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4008 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
4009 &rbd_dev->header_oloc, RBD_LOCK_NAME,
4010 &lock_type, &lock_tag, lockers, num_lockers);
4014 if (*num_lockers == 0) {
4015 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
4019 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
4020 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
4026 if (lock_type == CEPH_CLS_LOCK_SHARED) {
4027 rbd_warn(rbd_dev, "shared lock type detected");
4032 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
4033 strlen(RBD_LOCK_COOKIE_PREFIX))) {
4034 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
4035 (*lockers)[0].id.cookie);
4045 static int find_watcher(struct rbd_device *rbd_dev,
4046 const struct ceph_locker *locker)
4048 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4049 struct ceph_watch_item *watchers;
4055 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
4056 &rbd_dev->header_oloc, &watchers,
4061 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
4062 for (i = 0; i < num_watchers; i++) {
4063 if (!memcmp(&watchers[i].addr, &locker->info.addr,
4064 sizeof(locker->info.addr)) &&
4065 watchers[i].cookie == cookie) {
4066 struct rbd_client_id cid = {
4067 .gid = le64_to_cpu(watchers[i].name.num),
4071 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
4072 rbd_dev, cid.gid, cid.handle);
4073 rbd_set_owner_cid(rbd_dev, &cid);
4079 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
4087 * lock_rwsem must be held for write
4089 static int rbd_try_lock(struct rbd_device *rbd_dev)
4091 struct ceph_client *client = rbd_dev->rbd_client->client;
4092 struct ceph_locker *lockers;
4097 ret = rbd_lock(rbd_dev);
4101 /* determine if the current lock holder is still alive */
4102 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
4106 if (num_lockers == 0)
4109 ret = find_watcher(rbd_dev, lockers);
4111 goto out; /* request lock or error */
4113 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4114 ENTITY_NAME(lockers[0].id.name));
4116 ret = ceph_monc_blacklist_add(&client->monc,
4117 &lockers[0].info.addr);
4119 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
4120 ENTITY_NAME(lockers[0].id.name), ret);
4124 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4125 &rbd_dev->header_oloc, RBD_LOCK_NAME,
4126 lockers[0].id.cookie,
4127 &lockers[0].id.name);
4128 if (ret && ret != -ENOENT)
4132 ceph_free_lockers(lockers, num_lockers);
4136 ceph_free_lockers(lockers, num_lockers);
4140 static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4144 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4145 ret = rbd_object_map_open(rbd_dev);
4156 * 1 - caller should call rbd_request_lock()
4159 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4163 down_read(&rbd_dev->lock_rwsem);
4164 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4165 rbd_dev->lock_state);
4166 if (__rbd_is_lock_owner(rbd_dev)) {
4167 up_read(&rbd_dev->lock_rwsem);
4171 up_read(&rbd_dev->lock_rwsem);
4172 down_write(&rbd_dev->lock_rwsem);
4173 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4174 rbd_dev->lock_state);
4175 if (__rbd_is_lock_owner(rbd_dev)) {
4176 up_write(&rbd_dev->lock_rwsem);
4180 ret = rbd_try_lock(rbd_dev);
4182 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4183 if (ret == -EBLACKLISTED)
4186 ret = 1; /* request lock anyway */
4189 up_write(&rbd_dev->lock_rwsem);
4193 rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4194 rbd_assert(list_empty(&rbd_dev->running_list));
4196 ret = rbd_post_acquire_action(rbd_dev);
4198 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4200 * Can't stay in RBD_LOCK_STATE_LOCKED because
4201 * rbd_lock_add_request() would let the request through,
4202 * assuming that e.g. object map is locked and loaded.
4204 rbd_unlock(rbd_dev);
4208 wake_lock_waiters(rbd_dev, ret);
4209 up_write(&rbd_dev->lock_rwsem);
4213 static void rbd_acquire_lock(struct work_struct *work)
4215 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4216 struct rbd_device, lock_dwork);
4219 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4221 ret = rbd_try_acquire_lock(rbd_dev);
4223 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4227 ret = rbd_request_lock(rbd_dev);
4228 if (ret == -ETIMEDOUT) {
4229 goto again; /* treat this as a dead client */
4230 } else if (ret == -EROFS) {
4231 rbd_warn(rbd_dev, "peer will not release lock");
4232 down_write(&rbd_dev->lock_rwsem);
4233 wake_lock_waiters(rbd_dev, ret);
4234 up_write(&rbd_dev->lock_rwsem);
4235 } else if (ret < 0) {
4236 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4237 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4241 * lock owner acked, but resend if we don't see them
4244 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4246 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4247 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4251 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4255 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4256 lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4258 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4262 * Ensure that all in-flight IO is flushed.
4264 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4265 rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4266 need_wait = !list_empty(&rbd_dev->running_list);
4267 downgrade_write(&rbd_dev->lock_rwsem);
4269 wait_for_completion(&rbd_dev->releasing_wait);
4270 up_read(&rbd_dev->lock_rwsem);
4272 down_write(&rbd_dev->lock_rwsem);
4273 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4276 rbd_assert(list_empty(&rbd_dev->running_list));
4280 static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4282 if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4283 rbd_object_map_close(rbd_dev);
4286 static void __rbd_release_lock(struct rbd_device *rbd_dev)
4288 rbd_assert(list_empty(&rbd_dev->running_list));
4290 rbd_pre_release_action(rbd_dev);
4291 rbd_unlock(rbd_dev);
4295 * lock_rwsem must be held for write
4297 static void rbd_release_lock(struct rbd_device *rbd_dev)
4299 if (!rbd_quiesce_lock(rbd_dev))
4302 __rbd_release_lock(rbd_dev);
4305 * Give others a chance to grab the lock - we would re-acquire
4306 * almost immediately if we got new IO while draining the running
4307 * list otherwise. We need to ack our own notifications, so this
4308 * lock_dwork will be requeued from rbd_handle_released_lock() by
4309 * way of maybe_kick_acquire().
4311 cancel_delayed_work(&rbd_dev->lock_dwork);
4314 static void rbd_release_lock_work(struct work_struct *work)
4316 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4319 down_write(&rbd_dev->lock_rwsem);
4320 rbd_release_lock(rbd_dev);
4321 up_write(&rbd_dev->lock_rwsem);
4324 static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4328 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4329 if (__rbd_is_lock_owner(rbd_dev))
4332 spin_lock(&rbd_dev->lock_lists_lock);
4333 have_requests = !list_empty(&rbd_dev->acquiring_list);
4334 spin_unlock(&rbd_dev->lock_lists_lock);
4335 if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4336 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4337 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4341 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4344 struct rbd_client_id cid = { 0 };
4346 if (struct_v >= 2) {
4347 cid.gid = ceph_decode_64(p);
4348 cid.handle = ceph_decode_64(p);
4351 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4353 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4354 down_write(&rbd_dev->lock_rwsem);
4355 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4357 * we already know that the remote client is
4360 up_write(&rbd_dev->lock_rwsem);
4364 rbd_set_owner_cid(rbd_dev, &cid);
4365 downgrade_write(&rbd_dev->lock_rwsem);
4367 down_read(&rbd_dev->lock_rwsem);
4370 maybe_kick_acquire(rbd_dev);
4371 up_read(&rbd_dev->lock_rwsem);
4374 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4377 struct rbd_client_id cid = { 0 };
4379 if (struct_v >= 2) {
4380 cid.gid = ceph_decode_64(p);
4381 cid.handle = ceph_decode_64(p);
4384 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4386 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4387 down_write(&rbd_dev->lock_rwsem);
4388 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4389 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
4390 __func__, rbd_dev, cid.gid, cid.handle,
4391 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4392 up_write(&rbd_dev->lock_rwsem);
4396 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4397 downgrade_write(&rbd_dev->lock_rwsem);
4399 down_read(&rbd_dev->lock_rwsem);
4402 maybe_kick_acquire(rbd_dev);
4403 up_read(&rbd_dev->lock_rwsem);
4407 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4408 * ResponseMessage is needed.
4410 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4413 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4414 struct rbd_client_id cid = { 0 };
4417 if (struct_v >= 2) {
4418 cid.gid = ceph_decode_64(p);
4419 cid.handle = ceph_decode_64(p);
4422 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4424 if (rbd_cid_equal(&cid, &my_cid))
4427 down_read(&rbd_dev->lock_rwsem);
4428 if (__rbd_is_lock_owner(rbd_dev)) {
4429 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4430 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4434 * encode ResponseMessage(0) so the peer can detect
4439 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4440 if (!rbd_dev->opts->exclusive) {
4441 dout("%s rbd_dev %p queueing unlock_work\n",
4443 queue_work(rbd_dev->task_wq,
4444 &rbd_dev->unlock_work);
4446 /* refuse to release the lock */
4453 up_read(&rbd_dev->lock_rwsem);
4457 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4458 u64 notify_id, u64 cookie, s32 *result)
4460 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4461 char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4462 int buf_size = sizeof(buf);
4468 /* encode ResponseMessage */
4469 ceph_start_encoding(&p, 1, 1,
4470 buf_size - CEPH_ENCODING_START_BLK_LEN);
4471 ceph_encode_32(&p, *result);
4476 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4477 &rbd_dev->header_oloc, notify_id, cookie,
4480 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4483 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4486 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4487 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4490 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4491 u64 notify_id, u64 cookie, s32 result)
4493 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4494 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4497 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4498 u64 notifier_id, void *data, size_t data_len)
4500 struct rbd_device *rbd_dev = arg;
4502 void *const end = p + data_len;
4508 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4509 __func__, rbd_dev, cookie, notify_id, data_len);
4511 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4514 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4519 notify_op = ceph_decode_32(&p);
4521 /* legacy notification for header updates */
4522 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4526 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4527 switch (notify_op) {
4528 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4529 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4530 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4532 case RBD_NOTIFY_OP_RELEASED_LOCK:
4533 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4534 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4536 case RBD_NOTIFY_OP_REQUEST_LOCK:
4537 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4539 rbd_acknowledge_notify_result(rbd_dev, notify_id,
4542 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4544 case RBD_NOTIFY_OP_HEADER_UPDATE:
4545 ret = rbd_dev_refresh(rbd_dev);
4547 rbd_warn(rbd_dev, "refresh failed: %d", ret);
4549 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4552 if (rbd_is_lock_owner(rbd_dev))
4553 rbd_acknowledge_notify_result(rbd_dev, notify_id,
4554 cookie, -EOPNOTSUPP);
4556 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4561 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4563 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4565 struct rbd_device *rbd_dev = arg;
4567 rbd_warn(rbd_dev, "encountered watch error: %d", err);
4569 down_write(&rbd_dev->lock_rwsem);
4570 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4571 up_write(&rbd_dev->lock_rwsem);
4573 mutex_lock(&rbd_dev->watch_mutex);
4574 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4575 __rbd_unregister_watch(rbd_dev);
4576 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4578 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4580 mutex_unlock(&rbd_dev->watch_mutex);
4584 * watch_mutex must be locked
4586 static int __rbd_register_watch(struct rbd_device *rbd_dev)
4588 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4589 struct ceph_osd_linger_request *handle;
4591 rbd_assert(!rbd_dev->watch_handle);
4592 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4594 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4595 &rbd_dev->header_oloc, rbd_watch_cb,
4596 rbd_watch_errcb, rbd_dev);
4598 return PTR_ERR(handle);
4600 rbd_dev->watch_handle = handle;
4605 * watch_mutex must be locked
4607 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4609 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4612 rbd_assert(rbd_dev->watch_handle);
4613 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4615 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4617 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4619 rbd_dev->watch_handle = NULL;
4622 static int rbd_register_watch(struct rbd_device *rbd_dev)
4626 mutex_lock(&rbd_dev->watch_mutex);
4627 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4628 ret = __rbd_register_watch(rbd_dev);
4632 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4633 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4636 mutex_unlock(&rbd_dev->watch_mutex);
4640 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4642 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4644 cancel_work_sync(&rbd_dev->acquired_lock_work);
4645 cancel_work_sync(&rbd_dev->released_lock_work);
4646 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4647 cancel_work_sync(&rbd_dev->unlock_work);
4650 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4652 cancel_tasks_sync(rbd_dev);
4654 mutex_lock(&rbd_dev->watch_mutex);
4655 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4656 __rbd_unregister_watch(rbd_dev);
4657 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4658 mutex_unlock(&rbd_dev->watch_mutex);
4660 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4661 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4665 * lock_rwsem must be held for write
4667 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4669 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4673 if (!rbd_quiesce_lock(rbd_dev))
4676 format_lock_cookie(rbd_dev, cookie);
4677 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4678 &rbd_dev->header_oloc, RBD_LOCK_NAME,
4679 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4680 RBD_LOCK_TAG, cookie);
4682 if (ret != -EOPNOTSUPP)
4683 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4687 * Lock cookie cannot be updated on older OSDs, so do
4688 * a manual release and queue an acquire.
4690 __rbd_release_lock(rbd_dev);
4691 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4693 __rbd_lock(rbd_dev, cookie);
4694 wake_lock_waiters(rbd_dev, 0);
4698 static void rbd_reregister_watch(struct work_struct *work)
4700 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4701 struct rbd_device, watch_dwork);
4704 dout("%s rbd_dev %p\n", __func__, rbd_dev);
4706 mutex_lock(&rbd_dev->watch_mutex);
4707 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4708 mutex_unlock(&rbd_dev->watch_mutex);
4712 ret = __rbd_register_watch(rbd_dev);
4714 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4715 if (ret != -EBLACKLISTED && ret != -ENOENT) {
4716 queue_delayed_work(rbd_dev->task_wq,
4717 &rbd_dev->watch_dwork,
4719 mutex_unlock(&rbd_dev->watch_mutex);
4723 mutex_unlock(&rbd_dev->watch_mutex);
4724 down_write(&rbd_dev->lock_rwsem);
4725 wake_lock_waiters(rbd_dev, ret);
4726 up_write(&rbd_dev->lock_rwsem);
4730 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4731 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4732 mutex_unlock(&rbd_dev->watch_mutex);
4734 down_write(&rbd_dev->lock_rwsem);
4735 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4736 rbd_reacquire_lock(rbd_dev);
4737 up_write(&rbd_dev->lock_rwsem);
4739 ret = rbd_dev_refresh(rbd_dev);
4741 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4745 * Synchronous osd object method call. Returns the number of bytes
4746 * returned in the outbound buffer, or a negative error code.
4748 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4749 struct ceph_object_id *oid,
4750 struct ceph_object_locator *oloc,
4751 const char *method_name,
4752 const void *outbound,
4753 size_t outbound_size,
4755 size_t inbound_size)
4757 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4758 struct page *req_page = NULL;
4759 struct page *reply_page;
4763 * Method calls are ultimately read operations. The result
4764 * should placed into the inbound buffer provided. They
4765 * also supply outbound data--parameters for the object
4766 * method. Currently if this is present it will be a
4770 if (outbound_size > PAGE_SIZE)
4773 req_page = alloc_page(GFP_KERNEL);
4777 memcpy(page_address(req_page), outbound, outbound_size);
4780 reply_page = alloc_page(GFP_KERNEL);
4783 __free_page(req_page);
4787 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4788 CEPH_OSD_FLAG_READ, req_page, outbound_size,
4789 &reply_page, &inbound_size);
4791 memcpy(inbound, page_address(reply_page), inbound_size);
4796 __free_page(req_page);
4797 __free_page(reply_page);
4801 static void rbd_queue_workfn(struct work_struct *work)
4803 struct request *rq = blk_mq_rq_from_pdu(work);
4804 struct rbd_device *rbd_dev = rq->q->queuedata;
4805 struct rbd_img_request *img_request;
4806 struct ceph_snap_context *snapc = NULL;
4807 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4808 u64 length = blk_rq_bytes(rq);
4809 enum obj_operation_type op_type;
4813 switch (req_op(rq)) {
4814 case REQ_OP_DISCARD:
4815 op_type = OBJ_OP_DISCARD;
4817 case REQ_OP_WRITE_ZEROES:
4818 op_type = OBJ_OP_ZEROOUT;
4821 op_type = OBJ_OP_WRITE;
4824 op_type = OBJ_OP_READ;
4827 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4832 /* Ignore/skip any zero-length requests */
4835 dout("%s: zero-length request\n", __func__);
4840 if (op_type != OBJ_OP_READ) {
4841 if (rbd_is_ro(rbd_dev)) {
4842 rbd_warn(rbd_dev, "%s on read-only mapping",
4843 obj_op_name(op_type));
4847 rbd_assert(!rbd_is_snap(rbd_dev));
4851 * Quit early if the mapped snapshot no longer exists. It's
4852 * still possible the snapshot will have disappeared by the
4853 * time our request arrives at the osd, but there's no sense in
4854 * sending it if we already know.
4856 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4857 dout("request for non-existent snapshot");
4858 rbd_assert(rbd_is_snap(rbd_dev));
4863 if (offset && length > U64_MAX - offset + 1) {
4864 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4867 goto err_rq; /* Shouldn't happen */
4870 blk_mq_start_request(rq);
4872 down_read(&rbd_dev->header_rwsem);
4873 mapping_size = rbd_dev->mapping.size;
4874 if (op_type != OBJ_OP_READ) {
4875 snapc = rbd_dev->header.snapc;
4876 ceph_get_snap_context(snapc);
4878 up_read(&rbd_dev->header_rwsem);
4880 if (offset + length > mapping_size) {
4881 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4882 length, mapping_size);
4887 img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
4892 img_request->rq = rq;
4893 snapc = NULL; /* img_request consumes a ref */
4895 dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4896 img_request, obj_op_name(op_type), offset, length);
4898 if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4899 result = rbd_img_fill_nodata(img_request, offset, length);
4901 result = rbd_img_fill_from_bio(img_request, offset, length,
4904 goto err_img_request;
4906 rbd_img_handle_request(img_request, 0);
4910 rbd_img_request_put(img_request);
4913 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4914 obj_op_name(op_type), length, offset, result);
4915 ceph_put_snap_context(snapc);
4917 blk_mq_end_request(rq, errno_to_blk_status(result));
4920 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4921 const struct blk_mq_queue_data *bd)
4923 struct request *rq = bd->rq;
4924 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4926 queue_work(rbd_wq, work);
4930 static void rbd_free_disk(struct rbd_device *rbd_dev)
4932 blk_cleanup_queue(rbd_dev->disk->queue);
4933 blk_mq_free_tag_set(&rbd_dev->tag_set);
4934 put_disk(rbd_dev->disk);
4935 rbd_dev->disk = NULL;
4938 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4939 struct ceph_object_id *oid,
4940 struct ceph_object_locator *oloc,
4941 void *buf, int buf_len)
4944 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4945 struct ceph_osd_request *req;
4946 struct page **pages;
4947 int num_pages = calc_pages_for(0, buf_len);
4950 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4954 ceph_oid_copy(&req->r_base_oid, oid);
4955 ceph_oloc_copy(&req->r_base_oloc, oloc);
4956 req->r_flags = CEPH_OSD_FLAG_READ;
4958 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4959 if (IS_ERR(pages)) {
4960 ret = PTR_ERR(pages);
4964 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4965 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4968 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4972 ceph_osdc_start_request(osdc, req, false);
4973 ret = ceph_osdc_wait_request(osdc, req);
4975 ceph_copy_from_page_vector(pages, buf, 0, ret);
4978 ceph_osdc_put_request(req);
4983 * Read the complete header for the given rbd device. On successful
4984 * return, the rbd_dev->header field will contain up-to-date
4985 * information about the image.
4987 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4989 struct rbd_image_header_ondisk *ondisk = NULL;
4996 * The complete header will include an array of its 64-bit
4997 * snapshot ids, followed by the names of those snapshots as
4998 * a contiguous block of NUL-terminated strings. Note that
4999 * the number of snapshots could change by the time we read
5000 * it in, in which case we re-read it.
5007 size = sizeof (*ondisk);
5008 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
5010 ondisk = kmalloc(size, GFP_KERNEL);
5014 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
5015 &rbd_dev->header_oloc, ondisk, size);
5018 if ((size_t)ret < size) {
5020 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
5024 if (!rbd_dev_ondisk_valid(ondisk)) {
5026 rbd_warn(rbd_dev, "invalid header");
5030 names_size = le64_to_cpu(ondisk->snap_names_len);
5031 want_count = snap_count;
5032 snap_count = le32_to_cpu(ondisk->snap_count);
5033 } while (snap_count != want_count);
5035 ret = rbd_header_from_disk(rbd_dev, ondisk);
5043 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
5044 * has disappeared from the (just updated) snapshot context.
5046 static void rbd_exists_validate(struct rbd_device *rbd_dev)
5050 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
5053 snap_id = rbd_dev->spec->snap_id;
5054 if (snap_id == CEPH_NOSNAP)
5057 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
5058 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5061 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
5066 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
5067 * try to update its size. If REMOVING is set, updating size
5068 * is just useless work since the device can't be opened.
5070 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
5071 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
5072 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
5073 dout("setting size to %llu sectors", (unsigned long long)size);
5074 set_capacity(rbd_dev->disk, size);
5075 revalidate_disk(rbd_dev->disk);
5079 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
5084 down_write(&rbd_dev->header_rwsem);
5085 mapping_size = rbd_dev->mapping.size;
5087 ret = rbd_dev_header_info(rbd_dev);
5092 * If there is a parent, see if it has disappeared due to the
5093 * mapped image getting flattened.
5095 if (rbd_dev->parent) {
5096 ret = rbd_dev_v2_parent_info(rbd_dev);
5101 if (!rbd_is_snap(rbd_dev)) {
5102 rbd_dev->mapping.size = rbd_dev->header.image_size;
5104 /* validate mapped snapshot's EXISTS flag */
5105 rbd_exists_validate(rbd_dev);
5109 up_write(&rbd_dev->header_rwsem);
5110 if (!ret && mapping_size != rbd_dev->mapping.size)
5111 rbd_dev_update_size(rbd_dev);
5116 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
5117 unsigned int hctx_idx, unsigned int numa_node)
5119 struct work_struct *work = blk_mq_rq_to_pdu(rq);
5121 INIT_WORK(work, rbd_queue_workfn);
5125 static const struct blk_mq_ops rbd_mq_ops = {
5126 .queue_rq = rbd_queue_rq,
5127 .init_request = rbd_init_request,
5130 static int rbd_init_disk(struct rbd_device *rbd_dev)
5132 struct gendisk *disk;
5133 struct request_queue *q;
5134 unsigned int objset_bytes =
5135 rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
5138 /* create gendisk info */
5139 disk = alloc_disk(single_major ?
5140 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
5141 RBD_MINORS_PER_MAJOR);
5145 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
5147 disk->major = rbd_dev->major;
5148 disk->first_minor = rbd_dev->minor;
5150 disk->flags |= GENHD_FL_EXT_DEVT;
5151 disk->fops = &rbd_bd_ops;
5152 disk->private_data = rbd_dev;
5154 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
5155 rbd_dev->tag_set.ops = &rbd_mq_ops;
5156 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
5157 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
5158 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
5159 rbd_dev->tag_set.nr_hw_queues = 1;
5160 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
5162 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
5166 q = blk_mq_init_queue(&rbd_dev->tag_set);
5172 blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5173 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
5175 blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
5176 q->limits.max_sectors = queue_max_hw_sectors(q);
5177 blk_queue_max_segments(q, USHRT_MAX);
5178 blk_queue_max_segment_size(q, UINT_MAX);
5179 blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5180 blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
5182 if (rbd_dev->opts->trim) {
5183 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
5184 q->limits.discard_granularity = rbd_dev->opts->alloc_size;
5185 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
5186 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
5189 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5190 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
5193 * disk_release() expects a queue ref from add_disk() and will
5194 * put it. Hold an extra ref until add_disk() is called.
5196 WARN_ON(!blk_get_queue(q));
5198 q->queuedata = rbd_dev;
5200 rbd_dev->disk = disk;
5204 blk_mq_free_tag_set(&rbd_dev->tag_set);
5214 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5216 return container_of(dev, struct rbd_device, dev);
5219 static ssize_t rbd_size_show(struct device *dev,
5220 struct device_attribute *attr, char *buf)
5222 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5224 return sprintf(buf, "%llu\n",
5225 (unsigned long long)rbd_dev->mapping.size);
5229 * Note this shows the features for whatever's mapped, which is not
5230 * necessarily the base image.
5232 static ssize_t rbd_features_show(struct device *dev,
5233 struct device_attribute *attr, char *buf)
5235 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5237 return sprintf(buf, "0x%016llx\n",
5238 (unsigned long long)rbd_dev->mapping.features);
5241 static ssize_t rbd_major_show(struct device *dev,
5242 struct device_attribute *attr, char *buf)
5244 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5247 return sprintf(buf, "%d\n", rbd_dev->major);
5249 return sprintf(buf, "(none)\n");
5252 static ssize_t rbd_minor_show(struct device *dev,
5253 struct device_attribute *attr, char *buf)
5255 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5257 return sprintf(buf, "%d\n", rbd_dev->minor);
5260 static ssize_t rbd_client_addr_show(struct device *dev,
5261 struct device_attribute *attr, char *buf)
5263 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5264 struct ceph_entity_addr *client_addr =
5265 ceph_client_addr(rbd_dev->rbd_client->client);
5267 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5268 le32_to_cpu(client_addr->nonce));
5271 static ssize_t rbd_client_id_show(struct device *dev,
5272 struct device_attribute *attr, char *buf)
5274 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5276 return sprintf(buf, "client%lld\n",
5277 ceph_client_gid(rbd_dev->rbd_client->client));
5280 static ssize_t rbd_cluster_fsid_show(struct device *dev,
5281 struct device_attribute *attr, char *buf)
5283 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5285 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5288 static ssize_t rbd_config_info_show(struct device *dev,
5289 struct device_attribute *attr, char *buf)
5291 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5293 return sprintf(buf, "%s\n", rbd_dev->config_info);
5296 static ssize_t rbd_pool_show(struct device *dev,
5297 struct device_attribute *attr, char *buf)
5299 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5301 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5304 static ssize_t rbd_pool_id_show(struct device *dev,
5305 struct device_attribute *attr, char *buf)
5307 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5309 return sprintf(buf, "%llu\n",
5310 (unsigned long long) rbd_dev->spec->pool_id);
5313 static ssize_t rbd_pool_ns_show(struct device *dev,
5314 struct device_attribute *attr, char *buf)
5316 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5318 return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5321 static ssize_t rbd_name_show(struct device *dev,
5322 struct device_attribute *attr, char *buf)
5324 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5326 if (rbd_dev->spec->image_name)
5327 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5329 return sprintf(buf, "(unknown)\n");
5332 static ssize_t rbd_image_id_show(struct device *dev,
5333 struct device_attribute *attr, char *buf)
5335 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5337 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5341 * Shows the name of the currently-mapped snapshot (or
5342 * RBD_SNAP_HEAD_NAME for the base image).
5344 static ssize_t rbd_snap_show(struct device *dev,
5345 struct device_attribute *attr,
5348 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5350 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5353 static ssize_t rbd_snap_id_show(struct device *dev,
5354 struct device_attribute *attr, char *buf)
5356 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5358 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5362 * For a v2 image, shows the chain of parent images, separated by empty
5363 * lines. For v1 images or if there is no parent, shows "(no parent
5366 static ssize_t rbd_parent_show(struct device *dev,
5367 struct device_attribute *attr,
5370 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5373 if (!rbd_dev->parent)
5374 return sprintf(buf, "(no parent image)\n");
5376 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5377 struct rbd_spec *spec = rbd_dev->parent_spec;
5379 count += sprintf(&buf[count], "%s"
5380 "pool_id %llu\npool_name %s\n"
5382 "image_id %s\nimage_name %s\n"
5383 "snap_id %llu\nsnap_name %s\n"
5385 !count ? "" : "\n", /* first? */
5386 spec->pool_id, spec->pool_name,
5387 spec->pool_ns ?: "",
5388 spec->image_id, spec->image_name ?: "(unknown)",
5389 spec->snap_id, spec->snap_name,
5390 rbd_dev->parent_overlap);
5396 static ssize_t rbd_image_refresh(struct device *dev,
5397 struct device_attribute *attr,
5401 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5404 ret = rbd_dev_refresh(rbd_dev);
5411 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5412 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5413 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5414 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5415 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5416 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5417 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5418 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5419 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5420 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5421 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5422 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5423 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5424 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5425 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5426 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5427 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5429 static struct attribute *rbd_attrs[] = {
5430 &dev_attr_size.attr,
5431 &dev_attr_features.attr,
5432 &dev_attr_major.attr,
5433 &dev_attr_minor.attr,
5434 &dev_attr_client_addr.attr,
5435 &dev_attr_client_id.attr,
5436 &dev_attr_cluster_fsid.attr,
5437 &dev_attr_config_info.attr,
5438 &dev_attr_pool.attr,
5439 &dev_attr_pool_id.attr,
5440 &dev_attr_pool_ns.attr,
5441 &dev_attr_name.attr,
5442 &dev_attr_image_id.attr,
5443 &dev_attr_current_snap.attr,
5444 &dev_attr_snap_id.attr,
5445 &dev_attr_parent.attr,
5446 &dev_attr_refresh.attr,
5450 static struct attribute_group rbd_attr_group = {
5454 static const struct attribute_group *rbd_attr_groups[] = {
5459 static void rbd_dev_release(struct device *dev);
5461 static const struct device_type rbd_device_type = {
5463 .groups = rbd_attr_groups,
5464 .release = rbd_dev_release,
5467 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5469 kref_get(&spec->kref);
5474 static void rbd_spec_free(struct kref *kref);
5475 static void rbd_spec_put(struct rbd_spec *spec)
5478 kref_put(&spec->kref, rbd_spec_free);
5481 static struct rbd_spec *rbd_spec_alloc(void)
5483 struct rbd_spec *spec;
5485 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5489 spec->pool_id = CEPH_NOPOOL;
5490 spec->snap_id = CEPH_NOSNAP;
5491 kref_init(&spec->kref);
5496 static void rbd_spec_free(struct kref *kref)
5498 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5500 kfree(spec->pool_name);
5501 kfree(spec->pool_ns);
5502 kfree(spec->image_id);
5503 kfree(spec->image_name);
5504 kfree(spec->snap_name);
5508 static void rbd_dev_free(struct rbd_device *rbd_dev)
5510 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5511 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5513 ceph_oid_destroy(&rbd_dev->header_oid);
5514 ceph_oloc_destroy(&rbd_dev->header_oloc);
5515 kfree(rbd_dev->config_info);
5517 rbd_put_client(rbd_dev->rbd_client);
5518 rbd_spec_put(rbd_dev->spec);
5519 kfree(rbd_dev->opts);
5523 static void rbd_dev_release(struct device *dev)
5525 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5526 bool need_put = !!rbd_dev->opts;
5529 destroy_workqueue(rbd_dev->task_wq);
5530 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5533 rbd_dev_free(rbd_dev);
5536 * This is racy, but way better than putting module outside of
5537 * the release callback. The race window is pretty small, so
5538 * doing something similar to dm (dm-builtin.c) is overkill.
5541 module_put(THIS_MODULE);
5544 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5545 struct rbd_spec *spec)
5547 struct rbd_device *rbd_dev;
5549 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5553 spin_lock_init(&rbd_dev->lock);
5554 INIT_LIST_HEAD(&rbd_dev->node);
5555 init_rwsem(&rbd_dev->header_rwsem);
5557 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5558 ceph_oid_init(&rbd_dev->header_oid);
5559 rbd_dev->header_oloc.pool = spec->pool_id;
5560 if (spec->pool_ns) {
5561 WARN_ON(!*spec->pool_ns);
5562 rbd_dev->header_oloc.pool_ns =
5563 ceph_find_or_create_string(spec->pool_ns,
5564 strlen(spec->pool_ns));
5567 mutex_init(&rbd_dev->watch_mutex);
5568 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5569 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5571 init_rwsem(&rbd_dev->lock_rwsem);
5572 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5573 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5574 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5575 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5576 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5577 spin_lock_init(&rbd_dev->lock_lists_lock);
5578 INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5579 INIT_LIST_HEAD(&rbd_dev->running_list);
5580 init_completion(&rbd_dev->acquire_wait);
5581 init_completion(&rbd_dev->releasing_wait);
5583 spin_lock_init(&rbd_dev->object_map_lock);
5585 rbd_dev->dev.bus = &rbd_bus_type;
5586 rbd_dev->dev.type = &rbd_device_type;
5587 rbd_dev->dev.parent = &rbd_root_dev;
5588 device_initialize(&rbd_dev->dev);
5590 rbd_dev->rbd_client = rbdc;
5591 rbd_dev->spec = spec;
5597 * Create a mapping rbd_dev.
5599 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5600 struct rbd_spec *spec,
5601 struct rbd_options *opts)
5603 struct rbd_device *rbd_dev;
5605 rbd_dev = __rbd_dev_create(rbdc, spec);
5609 rbd_dev->opts = opts;
5611 /* get an id and fill in device name */
5612 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5613 minor_to_rbd_dev_id(1 << MINORBITS),
5615 if (rbd_dev->dev_id < 0)
5618 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5619 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5621 if (!rbd_dev->task_wq)
5624 /* we have a ref from do_rbd_add() */
5625 __module_get(THIS_MODULE);
5627 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5631 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5633 rbd_dev_free(rbd_dev);
5637 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5640 put_device(&rbd_dev->dev);
5644 * Get the size and object order for an image snapshot, or if
5645 * snap_id is CEPH_NOSNAP, gets this information for the base
5648 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5649 u8 *order, u64 *snap_size)
5651 __le64 snapid = cpu_to_le64(snap_id);
5656 } __attribute__ ((packed)) size_buf = { 0 };
5658 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5659 &rbd_dev->header_oloc, "get_size",
5660 &snapid, sizeof(snapid),
5661 &size_buf, sizeof(size_buf));
5662 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5665 if (ret < sizeof (size_buf))
5669 *order = size_buf.order;
5670 dout(" order %u", (unsigned int)*order);
5672 *snap_size = le64_to_cpu(size_buf.size);
5674 dout(" snap_id 0x%016llx snap_size = %llu\n",
5675 (unsigned long long)snap_id,
5676 (unsigned long long)*snap_size);
5681 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5683 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5684 &rbd_dev->header.obj_order,
5685 &rbd_dev->header.image_size);
5688 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5695 /* Response will be an encoded string, which includes a length */
5696 size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5697 reply_buf = kzalloc(size, GFP_KERNEL);
5701 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5702 &rbd_dev->header_oloc, "get_object_prefix",
5703 NULL, 0, reply_buf, size);
5704 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5709 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5710 p + ret, NULL, GFP_NOIO);
5713 if (IS_ERR(rbd_dev->header.object_prefix)) {
5714 ret = PTR_ERR(rbd_dev->header.object_prefix);
5715 rbd_dev->header.object_prefix = NULL;
5717 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
5725 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5728 __le64 snapid = cpu_to_le64(snap_id);
5732 } __attribute__ ((packed)) features_buf = { 0 };
5736 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5737 &rbd_dev->header_oloc, "get_features",
5738 &snapid, sizeof(snapid),
5739 &features_buf, sizeof(features_buf));
5740 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5743 if (ret < sizeof (features_buf))
5746 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5748 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5753 *snap_features = le64_to_cpu(features_buf.features);
5755 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5756 (unsigned long long)snap_id,
5757 (unsigned long long)*snap_features,
5758 (unsigned long long)le64_to_cpu(features_buf.incompat));
5763 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5765 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5766 &rbd_dev->header.features);
5770 * These are generic image flags, but since they are used only for
5771 * object map, store them in rbd_dev->object_map_flags.
5773 * For the same reason, this function is called only on object map
5774 * (re)load and not on header refresh.
5776 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5778 __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5782 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5783 &rbd_dev->header_oloc, "get_flags",
5784 &snapid, sizeof(snapid),
5785 &flags, sizeof(flags));
5788 if (ret < sizeof(flags))
5791 rbd_dev->object_map_flags = le64_to_cpu(flags);
5795 struct parent_image_info {
5797 const char *pool_ns;
5798 const char *image_id;
5806 * The caller is responsible for @pii.
5808 static int decode_parent_image_spec(void **p, void *end,
5809 struct parent_image_info *pii)
5815 ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5816 &struct_v, &struct_len);
5820 ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5821 pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5822 if (IS_ERR(pii->pool_ns)) {
5823 ret = PTR_ERR(pii->pool_ns);
5824 pii->pool_ns = NULL;
5827 pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5828 if (IS_ERR(pii->image_id)) {
5829 ret = PTR_ERR(pii->image_id);
5830 pii->image_id = NULL;
5833 ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5840 static int __get_parent_info(struct rbd_device *rbd_dev,
5841 struct page *req_page,
5842 struct page *reply_page,
5843 struct parent_image_info *pii)
5845 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5846 size_t reply_len = PAGE_SIZE;
5850 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5851 "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5852 req_page, sizeof(u64), &reply_page, &reply_len);
5854 return ret == -EOPNOTSUPP ? 1 : ret;
5856 p = page_address(reply_page);
5857 end = p + reply_len;
5858 ret = decode_parent_image_spec(&p, end, pii);
5862 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5863 "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5864 req_page, sizeof(u64), &reply_page, &reply_len);
5868 p = page_address(reply_page);
5869 end = p + reply_len;
5870 ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5871 if (pii->has_overlap)
5872 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5881 * The caller is responsible for @pii.
5883 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5884 struct page *req_page,
5885 struct page *reply_page,
5886 struct parent_image_info *pii)
5888 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5889 size_t reply_len = PAGE_SIZE;
5893 ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5894 "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5895 req_page, sizeof(u64), &reply_page, &reply_len);
5899 p = page_address(reply_page);
5900 end = p + reply_len;
5901 ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5902 pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5903 if (IS_ERR(pii->image_id)) {
5904 ret = PTR_ERR(pii->image_id);
5905 pii->image_id = NULL;
5908 ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5909 pii->has_overlap = true;
5910 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5918 static int get_parent_info(struct rbd_device *rbd_dev,
5919 struct parent_image_info *pii)
5921 struct page *req_page, *reply_page;
5925 req_page = alloc_page(GFP_KERNEL);
5929 reply_page = alloc_page(GFP_KERNEL);
5931 __free_page(req_page);
5935 p = page_address(req_page);
5936 ceph_encode_64(&p, rbd_dev->spec->snap_id);
5937 ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5939 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5942 __free_page(req_page);
5943 __free_page(reply_page);
5947 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5949 struct rbd_spec *parent_spec;
5950 struct parent_image_info pii = { 0 };
5953 parent_spec = rbd_spec_alloc();
5957 ret = get_parent_info(rbd_dev, &pii);
5961 dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5962 __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5963 pii.has_overlap, pii.overlap);
5965 if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5967 * Either the parent never existed, or we have
5968 * record of it but the image got flattened so it no
5969 * longer has a parent. When the parent of a
5970 * layered image disappears we immediately set the
5971 * overlap to 0. The effect of this is that all new
5972 * requests will be treated as if the image had no
5975 * If !pii.has_overlap, the parent image spec is not
5976 * applicable. It's there to avoid duplication in each
5979 if (rbd_dev->parent_overlap) {
5980 rbd_dev->parent_overlap = 0;
5981 rbd_dev_parent_put(rbd_dev);
5982 pr_info("%s: clone image has been flattened\n",
5983 rbd_dev->disk->disk_name);
5986 goto out; /* No parent? No problem. */
5989 /* The ceph file layout needs to fit pool id in 32 bits */
5992 if (pii.pool_id > (u64)U32_MAX) {
5993 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5994 (unsigned long long)pii.pool_id, U32_MAX);
5999 * The parent won't change (except when the clone is
6000 * flattened, already handled that). So we only need to
6001 * record the parent spec we have not already done so.
6003 if (!rbd_dev->parent_spec) {
6004 parent_spec->pool_id = pii.pool_id;
6005 if (pii.pool_ns && *pii.pool_ns) {
6006 parent_spec->pool_ns = pii.pool_ns;
6009 parent_spec->image_id = pii.image_id;
6010 pii.image_id = NULL;
6011 parent_spec->snap_id = pii.snap_id;
6013 rbd_dev->parent_spec = parent_spec;
6014 parent_spec = NULL; /* rbd_dev now owns this */
6018 * We always update the parent overlap. If it's zero we issue
6019 * a warning, as we will proceed as if there was no parent.
6023 /* refresh, careful to warn just once */
6024 if (rbd_dev->parent_overlap)
6026 "clone now standalone (overlap became 0)");
6029 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
6032 rbd_dev->parent_overlap = pii.overlap;
6038 kfree(pii.image_id);
6039 rbd_spec_put(parent_spec);
6043 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
6047 __le64 stripe_count;
6048 } __attribute__ ((packed)) striping_info_buf = { 0 };
6049 size_t size = sizeof (striping_info_buf);
6053 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6054 &rbd_dev->header_oloc, "get_stripe_unit_count",
6055 NULL, 0, &striping_info_buf, size);
6056 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6062 p = &striping_info_buf;
6063 rbd_dev->header.stripe_unit = ceph_decode_64(&p);
6064 rbd_dev->header.stripe_count = ceph_decode_64(&p);
6068 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
6070 __le64 data_pool_id;
6073 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6074 &rbd_dev->header_oloc, "get_data_pool",
6075 NULL, 0, &data_pool_id, sizeof(data_pool_id));
6078 if (ret < sizeof(data_pool_id))
6081 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
6082 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
6086 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
6088 CEPH_DEFINE_OID_ONSTACK(oid);
6089 size_t image_id_size;
6094 void *reply_buf = NULL;
6096 char *image_name = NULL;
6099 rbd_assert(!rbd_dev->spec->image_name);
6101 len = strlen(rbd_dev->spec->image_id);
6102 image_id_size = sizeof (__le32) + len;
6103 image_id = kmalloc(image_id_size, GFP_KERNEL);
6108 end = image_id + image_id_size;
6109 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
6111 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
6112 reply_buf = kmalloc(size, GFP_KERNEL);
6116 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
6117 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6118 "dir_get_name", image_id, image_id_size,
6123 end = reply_buf + ret;
6125 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
6126 if (IS_ERR(image_name))
6129 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
6137 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6139 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6140 const char *snap_name;
6143 /* Skip over names until we find the one we are looking for */
6145 snap_name = rbd_dev->header.snap_names;
6146 while (which < snapc->num_snaps) {
6147 if (!strcmp(name, snap_name))
6148 return snapc->snaps[which];
6149 snap_name += strlen(snap_name) + 1;
6155 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6157 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6162 for (which = 0; !found && which < snapc->num_snaps; which++) {
6163 const char *snap_name;
6165 snap_id = snapc->snaps[which];
6166 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6167 if (IS_ERR(snap_name)) {
6168 /* ignore no-longer existing snapshots */
6169 if (PTR_ERR(snap_name) == -ENOENT)
6174 found = !strcmp(name, snap_name);
6177 return found ? snap_id : CEPH_NOSNAP;
6181 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
6182 * no snapshot by that name is found, or if an error occurs.
6184 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6186 if (rbd_dev->image_format == 1)
6187 return rbd_v1_snap_id_by_name(rbd_dev, name);
6189 return rbd_v2_snap_id_by_name(rbd_dev, name);
6193 * An image being mapped will have everything but the snap id.
6195 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6197 struct rbd_spec *spec = rbd_dev->spec;
6199 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6200 rbd_assert(spec->image_id && spec->image_name);
6201 rbd_assert(spec->snap_name);
6203 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6206 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6207 if (snap_id == CEPH_NOSNAP)
6210 spec->snap_id = snap_id;
6212 spec->snap_id = CEPH_NOSNAP;
6219 * A parent image will have all ids but none of the names.
6221 * All names in an rbd spec are dynamically allocated. It's OK if we
6222 * can't figure out the name for an image id.
6224 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6226 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6227 struct rbd_spec *spec = rbd_dev->spec;
6228 const char *pool_name;
6229 const char *image_name;
6230 const char *snap_name;
6233 rbd_assert(spec->pool_id != CEPH_NOPOOL);
6234 rbd_assert(spec->image_id);
6235 rbd_assert(spec->snap_id != CEPH_NOSNAP);
6237 /* Get the pool name; we have to make our own copy of this */
6239 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6241 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6244 pool_name = kstrdup(pool_name, GFP_KERNEL);
6248 /* Fetch the image name; tolerate failure here */
6250 image_name = rbd_dev_image_name(rbd_dev);
6252 rbd_warn(rbd_dev, "unable to get image name");
6254 /* Fetch the snapshot name */
6256 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6257 if (IS_ERR(snap_name)) {
6258 ret = PTR_ERR(snap_name);
6262 spec->pool_name = pool_name;
6263 spec->image_name = image_name;
6264 spec->snap_name = snap_name;
6274 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6283 struct ceph_snap_context *snapc;
6287 * We'll need room for the seq value (maximum snapshot id),
6288 * snapshot count, and array of that many snapshot ids.
6289 * For now we have a fixed upper limit on the number we're
6290 * prepared to receive.
6292 size = sizeof (__le64) + sizeof (__le32) +
6293 RBD_MAX_SNAP_COUNT * sizeof (__le64);
6294 reply_buf = kzalloc(size, GFP_KERNEL);
6298 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6299 &rbd_dev->header_oloc, "get_snapcontext",
6300 NULL, 0, reply_buf, size);
6301 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6306 end = reply_buf + ret;
6308 ceph_decode_64_safe(&p, end, seq, out);
6309 ceph_decode_32_safe(&p, end, snap_count, out);
6312 * Make sure the reported number of snapshot ids wouldn't go
6313 * beyond the end of our buffer. But before checking that,
6314 * make sure the computed size of the snapshot context we
6315 * allocate is representable in a size_t.
6317 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6322 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6326 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6332 for (i = 0; i < snap_count; i++)
6333 snapc->snaps[i] = ceph_decode_64(&p);
6335 ceph_put_snap_context(rbd_dev->header.snapc);
6336 rbd_dev->header.snapc = snapc;
6338 dout(" snap context seq = %llu, snap_count = %u\n",
6339 (unsigned long long)seq, (unsigned int)snap_count);
6346 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6357 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6358 reply_buf = kmalloc(size, GFP_KERNEL);
6360 return ERR_PTR(-ENOMEM);
6362 snapid = cpu_to_le64(snap_id);
6363 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6364 &rbd_dev->header_oloc, "get_snapshot_name",
6365 &snapid, sizeof(snapid), reply_buf, size);
6366 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6368 snap_name = ERR_PTR(ret);
6373 end = reply_buf + ret;
6374 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6375 if (IS_ERR(snap_name))
6378 dout(" snap_id 0x%016llx snap_name = %s\n",
6379 (unsigned long long)snap_id, snap_name);
6386 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6388 bool first_time = rbd_dev->header.object_prefix == NULL;
6391 ret = rbd_dev_v2_image_size(rbd_dev);
6396 ret = rbd_dev_v2_header_onetime(rbd_dev);
6401 ret = rbd_dev_v2_snap_context(rbd_dev);
6402 if (ret && first_time) {
6403 kfree(rbd_dev->header.object_prefix);
6404 rbd_dev->header.object_prefix = NULL;
6410 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6412 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6414 if (rbd_dev->image_format == 1)
6415 return rbd_dev_v1_header_info(rbd_dev);
6417 return rbd_dev_v2_header_info(rbd_dev);
6421 * Skips over white space at *buf, and updates *buf to point to the
6422 * first found non-space character (if any). Returns the length of
6423 * the token (string of non-white space characters) found. Note
6424 * that *buf must be terminated with '\0'.
6426 static inline size_t next_token(const char **buf)
6429 * These are the characters that produce nonzero for
6430 * isspace() in the "C" and "POSIX" locales.
6432 const char *spaces = " \f\n\r\t\v";
6434 *buf += strspn(*buf, spaces); /* Find start of token */
6436 return strcspn(*buf, spaces); /* Return token length */
6440 * Finds the next token in *buf, dynamically allocates a buffer big
6441 * enough to hold a copy of it, and copies the token into the new
6442 * buffer. The copy is guaranteed to be terminated with '\0'. Note
6443 * that a duplicate buffer is created even for a zero-length token.
6445 * Returns a pointer to the newly-allocated duplicate, or a null
6446 * pointer if memory for the duplicate was not available. If
6447 * the lenp argument is a non-null pointer, the length of the token
6448 * (not including the '\0') is returned in *lenp.
6450 * If successful, the *buf pointer will be updated to point beyond
6451 * the end of the found token.
6453 * Note: uses GFP_KERNEL for allocation.
6455 static inline char *dup_token(const char **buf, size_t *lenp)
6460 len = next_token(buf);
6461 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6464 *(dup + len) = '\0';
6474 * Parse the options provided for an "rbd add" (i.e., rbd image
6475 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
6476 * and the data written is passed here via a NUL-terminated buffer.
6477 * Returns 0 if successful or an error code otherwise.
6479 * The information extracted from these options is recorded in
6480 * the other parameters which return dynamically-allocated
6483 * The address of a pointer that will refer to a ceph options
6484 * structure. Caller must release the returned pointer using
6485 * ceph_destroy_options() when it is no longer needed.
6487 * Address of an rbd options pointer. Fully initialized by
6488 * this function; caller must release with kfree().
6490 * Address of an rbd image specification pointer. Fully
6491 * initialized by this function based on parsed options.
6492 * Caller must release with rbd_spec_put().
6494 * The options passed take this form:
6495 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6498 * A comma-separated list of one or more monitor addresses.
6499 * A monitor address is an ip address, optionally followed
6500 * by a port number (separated by a colon).
6501 * I.e.: ip1[:port1][,ip2[:port2]...]
6503 * A comma-separated list of ceph and/or rbd options.
6505 * The name of the rados pool containing the rbd image.
6507 * The name of the image in that pool to map.
6509 * An optional snapshot id. If provided, the mapping will
6510 * present data from the image at the time that snapshot was
6511 * created. The image head is used if no snapshot id is
6512 * provided. Snapshot mappings are always read-only.
6514 static int rbd_add_parse_args(const char *buf,
6515 struct ceph_options **ceph_opts,
6516 struct rbd_options **opts,
6517 struct rbd_spec **rbd_spec)
6521 const char *mon_addrs;
6523 size_t mon_addrs_size;
6524 struct parse_rbd_opts_ctx pctx = { 0 };
6525 struct ceph_options *copts;
6528 /* The first four tokens are required */
6530 len = next_token(&buf);
6532 rbd_warn(NULL, "no monitor address(es) provided");
6536 mon_addrs_size = len + 1;
6540 options = dup_token(&buf, NULL);
6544 rbd_warn(NULL, "no options provided");
6548 pctx.spec = rbd_spec_alloc();
6552 pctx.spec->pool_name = dup_token(&buf, NULL);
6553 if (!pctx.spec->pool_name)
6555 if (!*pctx.spec->pool_name) {
6556 rbd_warn(NULL, "no pool name provided");
6560 pctx.spec->image_name = dup_token(&buf, NULL);
6561 if (!pctx.spec->image_name)
6563 if (!*pctx.spec->image_name) {
6564 rbd_warn(NULL, "no image name provided");
6569 * Snapshot name is optional; default is to use "-"
6570 * (indicating the head/no snapshot).
6572 len = next_token(&buf);
6574 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6575 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6576 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6577 ret = -ENAMETOOLONG;
6580 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6583 *(snap_name + len) = '\0';
6584 pctx.spec->snap_name = snap_name;
6586 /* Initialize all rbd options to the defaults */
6588 pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6592 pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6593 pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6594 pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6595 pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6596 pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6597 pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6598 pctx.opts->trim = RBD_TRIM_DEFAULT;
6600 copts = ceph_parse_options(options, mon_addrs,
6601 mon_addrs + mon_addrs_size - 1,
6602 parse_rbd_opts_token, &pctx);
6603 if (IS_ERR(copts)) {
6604 ret = PTR_ERR(copts);
6611 *rbd_spec = pctx.spec;
6618 rbd_spec_put(pctx.spec);
6624 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6626 down_write(&rbd_dev->lock_rwsem);
6627 if (__rbd_is_lock_owner(rbd_dev))
6628 __rbd_release_lock(rbd_dev);
6629 up_write(&rbd_dev->lock_rwsem);
6633 * If the wait is interrupted, an error is returned even if the lock
6634 * was successfully acquired. rbd_dev_image_unlock() will release it
6637 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6641 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6642 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6645 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6649 if (rbd_is_snap(rbd_dev))
6652 rbd_assert(!rbd_is_lock_owner(rbd_dev));
6653 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6654 ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6655 ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6657 ret = rbd_dev->acquire_err;
6659 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6665 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6670 * The lock may have been released by now, unless automatic lock
6671 * transitions are disabled.
6673 rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6678 * An rbd format 2 image has a unique identifier, distinct from the
6679 * name given to it by the user. Internally, that identifier is
6680 * what's used to specify the names of objects related to the image.
6682 * A special "rbd id" object is used to map an rbd image name to its
6683 * id. If that object doesn't exist, then there is no v2 rbd image
6684 * with the supplied name.
6686 * This function will record the given rbd_dev's image_id field if
6687 * it can be determined, and in that case will return 0. If any
6688 * errors occur a negative errno will be returned and the rbd_dev's
6689 * image_id field will be unchanged (and should be NULL).
6691 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6695 CEPH_DEFINE_OID_ONSTACK(oid);
6700 * When probing a parent image, the image id is already
6701 * known (and the image name likely is not). There's no
6702 * need to fetch the image id again in this case. We
6703 * do still need to set the image format though.
6705 if (rbd_dev->spec->image_id) {
6706 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6712 * First, see if the format 2 image id file exists, and if
6713 * so, get the image's persistent id from it.
6715 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6716 rbd_dev->spec->image_name);
6720 dout("rbd id object name is %s\n", oid.name);
6722 /* Response will be an encoded string, which includes a length */
6723 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6724 response = kzalloc(size, GFP_NOIO);
6730 /* If it doesn't exist we'll assume it's a format 1 image */
6732 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6735 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6736 if (ret == -ENOENT) {
6737 image_id = kstrdup("", GFP_KERNEL);
6738 ret = image_id ? 0 : -ENOMEM;
6740 rbd_dev->image_format = 1;
6741 } else if (ret >= 0) {
6744 image_id = ceph_extract_encoded_string(&p, p + ret,
6746 ret = PTR_ERR_OR_ZERO(image_id);
6748 rbd_dev->image_format = 2;
6752 rbd_dev->spec->image_id = image_id;
6753 dout("image_id is %s\n", image_id);
6757 ceph_oid_destroy(&oid);
6762 * Undo whatever state changes are made by v1 or v2 header info
6765 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6767 struct rbd_image_header *header;
6769 rbd_dev_parent_put(rbd_dev);
6770 rbd_object_map_free(rbd_dev);
6771 rbd_dev_mapping_clear(rbd_dev);
6773 /* Free dynamic fields from the header, then zero it out */
6775 header = &rbd_dev->header;
6776 ceph_put_snap_context(header->snapc);
6777 kfree(header->snap_sizes);
6778 kfree(header->snap_names);
6779 kfree(header->object_prefix);
6780 memset(header, 0, sizeof (*header));
6783 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6787 ret = rbd_dev_v2_object_prefix(rbd_dev);
6792 * Get the and check features for the image. Currently the
6793 * features are assumed to never change.
6795 ret = rbd_dev_v2_features(rbd_dev);
6799 /* If the image supports fancy striping, get its parameters */
6801 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6802 ret = rbd_dev_v2_striping_info(rbd_dev);
6807 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6808 ret = rbd_dev_v2_data_pool(rbd_dev);
6813 rbd_init_layout(rbd_dev);
6817 rbd_dev->header.features = 0;
6818 kfree(rbd_dev->header.object_prefix);
6819 rbd_dev->header.object_prefix = NULL;
6824 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6825 * rbd_dev_image_probe() recursion depth, which means it's also the
6826 * length of the already discovered part of the parent chain.
6828 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6830 struct rbd_device *parent = NULL;
6833 if (!rbd_dev->parent_spec)
6836 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6837 pr_info("parent chain is too long (%d)\n", depth);
6842 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6849 * Images related by parent/child relationships always share
6850 * rbd_client and spec/parent_spec, so bump their refcounts.
6852 __rbd_get_client(rbd_dev->rbd_client);
6853 rbd_spec_get(rbd_dev->parent_spec);
6855 __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6857 ret = rbd_dev_image_probe(parent, depth);
6861 rbd_dev->parent = parent;
6862 atomic_set(&rbd_dev->parent_ref, 1);
6866 rbd_dev_unparent(rbd_dev);
6867 rbd_dev_destroy(parent);
6871 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6873 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6874 rbd_free_disk(rbd_dev);
6876 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6880 * rbd_dev->header_rwsem must be locked for write and will be unlocked
6883 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6887 /* Record our major and minor device numbers. */
6889 if (!single_major) {
6890 ret = register_blkdev(0, rbd_dev->name);
6892 goto err_out_unlock;
6894 rbd_dev->major = ret;
6897 rbd_dev->major = rbd_major;
6898 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6901 /* Set up the blkdev mapping. */
6903 ret = rbd_init_disk(rbd_dev);
6905 goto err_out_blkdev;
6907 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6908 set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6910 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6914 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6915 up_write(&rbd_dev->header_rwsem);
6919 rbd_free_disk(rbd_dev);
6922 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6924 up_write(&rbd_dev->header_rwsem);
6928 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6930 struct rbd_spec *spec = rbd_dev->spec;
6933 /* Record the header object name for this rbd image. */
6935 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6936 if (rbd_dev->image_format == 1)
6937 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6938 spec->image_name, RBD_SUFFIX);
6940 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6941 RBD_HEADER_PREFIX, spec->image_id);
6946 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6948 rbd_dev_unprobe(rbd_dev);
6950 rbd_unregister_watch(rbd_dev);
6951 rbd_dev->image_format = 0;
6952 kfree(rbd_dev->spec->image_id);
6953 rbd_dev->spec->image_id = NULL;
6957 * Probe for the existence of the header object for the given rbd
6958 * device. If this image is the one being mapped (i.e., not a
6959 * parent), initiate a watch on its header object before using that
6960 * object to get detailed information about the rbd image.
6962 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6967 * Get the id from the image id object. Unless there's an
6968 * error, rbd_dev->spec->image_id will be filled in with
6969 * a dynamically-allocated string, and rbd_dev->image_format
6970 * will be set to either 1 or 2.
6972 ret = rbd_dev_image_id(rbd_dev);
6976 ret = rbd_dev_header_name(rbd_dev);
6978 goto err_out_format;
6981 ret = rbd_register_watch(rbd_dev);
6984 pr_info("image %s/%s%s%s does not exist\n",
6985 rbd_dev->spec->pool_name,
6986 rbd_dev->spec->pool_ns ?: "",
6987 rbd_dev->spec->pool_ns ? "/" : "",
6988 rbd_dev->spec->image_name);
6989 goto err_out_format;
6993 ret = rbd_dev_header_info(rbd_dev);
6998 * If this image is the one being mapped, we have pool name and
6999 * id, image name and id, and snap name - need to fill snap id.
7000 * Otherwise this is a parent image, identified by pool, image
7001 * and snap ids - need to fill in names for those ids.
7004 ret = rbd_spec_fill_snap_id(rbd_dev);
7006 ret = rbd_spec_fill_names(rbd_dev);
7009 pr_info("snap %s/%s%s%s@%s does not exist\n",
7010 rbd_dev->spec->pool_name,
7011 rbd_dev->spec->pool_ns ?: "",
7012 rbd_dev->spec->pool_ns ? "/" : "",
7013 rbd_dev->spec->image_name,
7014 rbd_dev->spec->snap_name);
7018 ret = rbd_dev_mapping_set(rbd_dev);
7022 if (rbd_is_snap(rbd_dev) &&
7023 (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
7024 ret = rbd_object_map_load(rbd_dev);
7029 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
7030 ret = rbd_dev_v2_parent_info(rbd_dev);
7035 ret = rbd_dev_probe_parent(rbd_dev, depth);
7039 dout("discovered format %u image, header name is %s\n",
7040 rbd_dev->image_format, rbd_dev->header_oid.name);
7044 rbd_dev_unprobe(rbd_dev);
7047 rbd_unregister_watch(rbd_dev);
7049 rbd_dev->image_format = 0;
7050 kfree(rbd_dev->spec->image_id);
7051 rbd_dev->spec->image_id = NULL;
7055 static ssize_t do_rbd_add(struct bus_type *bus,
7059 struct rbd_device *rbd_dev = NULL;
7060 struct ceph_options *ceph_opts = NULL;
7061 struct rbd_options *rbd_opts = NULL;
7062 struct rbd_spec *spec = NULL;
7063 struct rbd_client *rbdc;
7066 if (!try_module_get(THIS_MODULE))
7069 /* parse add command */
7070 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7074 rbdc = rbd_get_client(ceph_opts);
7081 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7084 pr_info("pool %s does not exist\n", spec->pool_name);
7085 goto err_out_client;
7087 spec->pool_id = (u64)rc;
7089 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7092 goto err_out_client;
7094 rbdc = NULL; /* rbd_dev now owns this */
7095 spec = NULL; /* rbd_dev now owns this */
7096 rbd_opts = NULL; /* rbd_dev now owns this */
7098 /* if we are mapping a snapshot it will be a read-only mapping */
7099 if (rbd_dev->opts->read_only ||
7100 strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7101 __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7103 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7104 if (!rbd_dev->config_info) {
7106 goto err_out_rbd_dev;
7109 down_write(&rbd_dev->header_rwsem);
7110 rc = rbd_dev_image_probe(rbd_dev, 0);
7112 up_write(&rbd_dev->header_rwsem);
7113 goto err_out_rbd_dev;
7116 if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7117 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7118 rbd_dev->layout.object_size);
7119 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7122 rc = rbd_dev_device_setup(rbd_dev);
7124 goto err_out_image_probe;
7126 rc = rbd_add_acquire_lock(rbd_dev);
7128 goto err_out_image_lock;
7130 /* Everything's ready. Announce the disk to the world. */
7132 rc = device_add(&rbd_dev->dev);
7134 goto err_out_image_lock;
7136 add_disk(rbd_dev->disk);
7137 /* see rbd_init_disk() */
7138 blk_put_queue(rbd_dev->disk->queue);
7140 spin_lock(&rbd_dev_list_lock);
7141 list_add_tail(&rbd_dev->node, &rbd_dev_list);
7142 spin_unlock(&rbd_dev_list_lock);
7144 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7145 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7146 rbd_dev->header.features);
7149 module_put(THIS_MODULE);
7153 rbd_dev_image_unlock(rbd_dev);
7154 rbd_dev_device_release(rbd_dev);
7155 err_out_image_probe:
7156 rbd_dev_image_release(rbd_dev);
7158 rbd_dev_destroy(rbd_dev);
7160 rbd_put_client(rbdc);
7167 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7172 return do_rbd_add(bus, buf, count);
7175 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7178 return do_rbd_add(bus, buf, count);
7181 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7183 while (rbd_dev->parent) {
7184 struct rbd_device *first = rbd_dev;
7185 struct rbd_device *second = first->parent;
7186 struct rbd_device *third;
7189 * Follow to the parent with no grandparent and
7192 while (second && (third = second->parent)) {
7197 rbd_dev_image_release(second);
7198 rbd_dev_destroy(second);
7199 first->parent = NULL;
7200 first->parent_overlap = 0;
7202 rbd_assert(first->parent_spec);
7203 rbd_spec_put(first->parent_spec);
7204 first->parent_spec = NULL;
7208 static ssize_t do_rbd_remove(struct bus_type *bus,
7212 struct rbd_device *rbd_dev = NULL;
7213 struct list_head *tmp;
7221 sscanf(buf, "%d %5s", &dev_id, opt_buf);
7223 pr_err("dev_id out of range\n");
7226 if (opt_buf[0] != '\0') {
7227 if (!strcmp(opt_buf, "force")) {
7230 pr_err("bad remove option at '%s'\n", opt_buf);
7236 spin_lock(&rbd_dev_list_lock);
7237 list_for_each(tmp, &rbd_dev_list) {
7238 rbd_dev = list_entry(tmp, struct rbd_device, node);
7239 if (rbd_dev->dev_id == dev_id) {
7245 spin_lock_irq(&rbd_dev->lock);
7246 if (rbd_dev->open_count && !force)
7248 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7251 spin_unlock_irq(&rbd_dev->lock);
7253 spin_unlock(&rbd_dev_list_lock);
7259 * Prevent new IO from being queued and wait for existing
7260 * IO to complete/fail.
7262 blk_mq_freeze_queue(rbd_dev->disk->queue);
7263 blk_set_queue_dying(rbd_dev->disk->queue);
7266 del_gendisk(rbd_dev->disk);
7267 spin_lock(&rbd_dev_list_lock);
7268 list_del_init(&rbd_dev->node);
7269 spin_unlock(&rbd_dev_list_lock);
7270 device_del(&rbd_dev->dev);
7272 rbd_dev_image_unlock(rbd_dev);
7273 rbd_dev_device_release(rbd_dev);
7274 rbd_dev_image_release(rbd_dev);
7275 rbd_dev_destroy(rbd_dev);
7279 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7284 return do_rbd_remove(bus, buf, count);
7287 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7290 return do_rbd_remove(bus, buf, count);
7294 * create control files in sysfs
7297 static int __init rbd_sysfs_init(void)
7301 ret = device_register(&rbd_root_dev);
7305 ret = bus_register(&rbd_bus_type);
7307 device_unregister(&rbd_root_dev);
7312 static void __exit rbd_sysfs_cleanup(void)
7314 bus_unregister(&rbd_bus_type);
7315 device_unregister(&rbd_root_dev);
7318 static int __init rbd_slab_init(void)
7320 rbd_assert(!rbd_img_request_cache);
7321 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7322 if (!rbd_img_request_cache)
7325 rbd_assert(!rbd_obj_request_cache);
7326 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7327 if (!rbd_obj_request_cache)
7333 kmem_cache_destroy(rbd_img_request_cache);
7334 rbd_img_request_cache = NULL;
7338 static void rbd_slab_exit(void)
7340 rbd_assert(rbd_obj_request_cache);
7341 kmem_cache_destroy(rbd_obj_request_cache);
7342 rbd_obj_request_cache = NULL;
7344 rbd_assert(rbd_img_request_cache);
7345 kmem_cache_destroy(rbd_img_request_cache);
7346 rbd_img_request_cache = NULL;
7349 static int __init rbd_init(void)
7353 if (!libceph_compatible(NULL)) {
7354 rbd_warn(NULL, "libceph incompatibility (quitting)");
7358 rc = rbd_slab_init();
7363 * The number of active work items is limited by the number of
7364 * rbd devices * queue depth, so leave @max_active at default.
7366 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7373 rbd_major = register_blkdev(0, RBD_DRV_NAME);
7374 if (rbd_major < 0) {
7380 rc = rbd_sysfs_init();
7382 goto err_out_blkdev;
7385 pr_info("loaded (major %d)\n", rbd_major);
7387 pr_info("loaded\n");
7393 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7395 destroy_workqueue(rbd_wq);
7401 static void __exit rbd_exit(void)
7403 ida_destroy(&rbd_dev_id_ida);
7404 rbd_sysfs_cleanup();
7406 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7407 destroy_workqueue(rbd_wq);
7411 module_init(rbd_init);
7412 module_exit(rbd_exit);
7414 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7415 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7416 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7417 /* following authorship retained from original osdblk.c */
7418 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7420 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7421 MODULE_LICENSE("GPL");