3 rbd.c -- Export ceph rados objects as a Linux block device
6 based on drivers/block/osdblk.c:
8 Copyright 2009 Red Hat, Inc.
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25 For usage instructions, please refer to:
27 Documentation/ABI/testing/sysfs-bus-rbd
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
50 #include "rbd_types.h"
52 #define RBD_DEBUG /* Activate rbd_assert() calls */
55 * Increment the given counter and return its updated value.
56 * If the counter is already 0 it will not be incremented.
57 * If the counter is already at its maximum value returns
58 * -EINVAL without updating it.
60 static int atomic_inc_return_safe(atomic_t *v)
64 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
65 if (counter <= (unsigned int)INT_MAX)
73 /* Decrement the counter. Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
78 counter = atomic_dec_return(v);
87 #define RBD_DRV_NAME "rbd"
89 #define RBD_MINORS_PER_MAJOR 256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT 4
92 #define RBD_MAX_PARENT_CHAIN_LEN 16
94 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN \
96 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
98 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
100 #define RBD_SNAP_HEAD_NAME "-"
102 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX 64
108 #define RBD_OBJ_PREFIX_LEN_MAX 64
110 #define RBD_NOTIFY_TIMEOUT 5 /* seconds */
111 #define RBD_RETRY_DELAY msecs_to_jiffies(1000)
115 #define RBD_FEATURE_LAYERING (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
118 #define RBD_FEATURE_DATA_POOL (1ULL<<7)
119 #define RBD_FEATURE_OPERATIONS (1ULL<<8)
121 #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
122 RBD_FEATURE_STRIPINGV2 | \
123 RBD_FEATURE_EXCLUSIVE_LOCK | \
124 RBD_FEATURE_DATA_POOL | \
125 RBD_FEATURE_OPERATIONS)
127 /* Features supported by this (client software) implementation. */
129 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
132 * An RBD device name will be "rbd#", where the "rbd" comes from
133 * RBD_DRV_NAME above, and # is a unique integer identifier.
135 #define DEV_NAME_LEN 32
138 * block device image metadata (in-memory version)
140 struct rbd_image_header {
141 /* These six fields never change for a given rbd image */
147 u64 features; /* Might be changeable someday? */
149 /* The remaining fields need to be updated occasionally */
151 struct ceph_snap_context *snapc;
152 char *snap_names; /* format 1 only */
153 u64 *snap_sizes; /* format 1 only */
157 * An rbd image specification.
159 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
160 * identify an image. Each rbd_dev structure includes a pointer to
161 * an rbd_spec structure that encapsulates this identity.
163 * Each of the id's in an rbd_spec has an associated name. For a
164 * user-mapped image, the names are supplied and the id's associated
165 * with them are looked up. For a layered image, a parent image is
166 * defined by the tuple, and the names are looked up.
168 * An rbd_dev structure contains a parent_spec pointer which is
169 * non-null if the image it represents is a child in a layered
170 * image. This pointer will refer to the rbd_spec structure used
171 * by the parent rbd_dev for its own identity (i.e., the structure
172 * is shared between the parent and child).
174 * Since these structures are populated once, during the discovery
175 * phase of image construction, they are effectively immutable so
176 * we make no effort to synchronize access to them.
178 * Note that code herein does not assume the image name is known (it
179 * could be a null pointer).
183 const char *pool_name;
185 const char *image_id;
186 const char *image_name;
189 const char *snap_name;
195 * an instance of the client. multiple devices may share an rbd client.
198 struct ceph_client *client;
200 struct list_head node;
203 struct rbd_img_request;
205 enum obj_request_type {
206 OBJ_REQUEST_NODATA = 1,
207 OBJ_REQUEST_BIO, /* pointer into provided bio (list) */
208 OBJ_REQUEST_BVECS, /* pointer into provided bio_vec array */
209 OBJ_REQUEST_OWN_BVECS, /* private bio_vec array, doesn't own pages */
212 enum obj_operation_type {
219 * Writes go through the following state machine to deal with
223 * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
225 * v \------------------------------/
231 * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
232 * there is a parent or not.
234 enum rbd_obj_write_state {
235 RBD_OBJ_WRITE_FLAT = 1,
237 RBD_OBJ_WRITE_COPYUP,
240 struct rbd_obj_request {
241 struct ceph_object_extent ex;
243 bool tried_parent; /* for reads */
244 enum rbd_obj_write_state write_state; /* for writes */
247 struct rbd_img_request *img_request;
248 struct ceph_file_extent *img_extents;
252 struct ceph_bio_iter bio_pos;
254 struct ceph_bvec_iter bvec_pos;
259 struct bio_vec *copyup_bvecs;
260 u32 copyup_bvec_count;
262 struct ceph_osd_request *osd_req;
264 u64 xferred; /* bytes transferred */
271 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
272 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
275 struct rbd_img_request {
276 struct rbd_device *rbd_dev;
277 enum obj_operation_type op_type;
278 enum obj_request_type data_type;
281 u64 snap_id; /* for reads */
282 struct ceph_snap_context *snapc; /* for writes */
285 struct request *rq; /* block request */
286 struct rbd_obj_request *obj_request; /* obj req initiator */
288 spinlock_t completion_lock;
289 u64 xferred;/* aggregate bytes transferred */
290 int result; /* first nonzero obj_request result */
292 struct list_head object_extents; /* obj_req.ex structs */
293 u32 obj_request_count;
299 #define for_each_obj_request(ireq, oreq) \
300 list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
301 #define for_each_obj_request_safe(ireq, oreq, n) \
302 list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
304 enum rbd_watch_state {
305 RBD_WATCH_STATE_UNREGISTERED,
306 RBD_WATCH_STATE_REGISTERED,
307 RBD_WATCH_STATE_ERROR,
310 enum rbd_lock_state {
311 RBD_LOCK_STATE_UNLOCKED,
312 RBD_LOCK_STATE_LOCKED,
313 RBD_LOCK_STATE_RELEASING,
316 /* WatchNotify::ClientId */
317 struct rbd_client_id {
331 int dev_id; /* blkdev unique id */
333 int major; /* blkdev assigned major */
335 struct gendisk *disk; /* blkdev's gendisk and rq */
337 u32 image_format; /* Either 1 or 2 */
338 struct rbd_client *rbd_client;
340 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342 spinlock_t lock; /* queue, flags, open_count */
344 struct rbd_image_header header;
345 unsigned long flags; /* possibly lock protected */
346 struct rbd_spec *spec;
347 struct rbd_options *opts;
348 char *config_info; /* add{,_single_major} string */
350 struct ceph_object_id header_oid;
351 struct ceph_object_locator header_oloc;
353 struct ceph_file_layout layout; /* used for all rbd requests */
355 struct mutex watch_mutex;
356 enum rbd_watch_state watch_state;
357 struct ceph_osd_linger_request *watch_handle;
359 struct delayed_work watch_dwork;
361 struct rw_semaphore lock_rwsem;
362 enum rbd_lock_state lock_state;
363 char lock_cookie[32];
364 struct rbd_client_id owner_cid;
365 struct work_struct acquired_lock_work;
366 struct work_struct released_lock_work;
367 struct delayed_work lock_dwork;
368 struct work_struct unlock_work;
369 wait_queue_head_t lock_waitq;
371 struct workqueue_struct *task_wq;
373 struct rbd_spec *parent_spec;
376 struct rbd_device *parent;
378 /* Block layer tags. */
379 struct blk_mq_tag_set tag_set;
381 /* protects updating the header */
382 struct rw_semaphore header_rwsem;
384 struct rbd_mapping mapping;
386 struct list_head node;
390 unsigned long open_count; /* protected by lock */
394 * Flag bits for rbd_dev->flags:
395 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
397 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
400 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
401 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
402 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
405 static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
407 static LIST_HEAD(rbd_dev_list); /* devices */
408 static DEFINE_SPINLOCK(rbd_dev_list_lock);
410 static LIST_HEAD(rbd_client_list); /* clients */
411 static DEFINE_SPINLOCK(rbd_client_list_lock);
413 /* Slab caches for frequently-allocated structures */
415 static struct kmem_cache *rbd_img_request_cache;
416 static struct kmem_cache *rbd_obj_request_cache;
418 static int rbd_major;
419 static DEFINE_IDA(rbd_dev_id_ida);
421 static struct workqueue_struct *rbd_wq;
424 * single-major requires >= 0.75 version of userspace rbd utility.
426 static bool single_major = true;
427 module_param(single_major, bool, S_IRUGO);
428 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
430 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
432 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
434 static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
436 static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
438 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
440 static int rbd_dev_id_to_minor(int dev_id)
442 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
445 static int minor_to_rbd_dev_id(int minor)
447 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
450 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
452 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
453 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
456 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
460 down_read(&rbd_dev->lock_rwsem);
461 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
462 up_read(&rbd_dev->lock_rwsem);
463 return is_lock_owner;
466 static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
468 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
471 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
472 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
473 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
474 static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
475 static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
477 static struct attribute *rbd_bus_attrs[] = {
479 &bus_attr_remove.attr,
480 &bus_attr_add_single_major.attr,
481 &bus_attr_remove_single_major.attr,
482 &bus_attr_supported_features.attr,
486 static umode_t rbd_bus_is_visible(struct kobject *kobj,
487 struct attribute *attr, int index)
490 (attr == &bus_attr_add_single_major.attr ||
491 attr == &bus_attr_remove_single_major.attr))
497 static const struct attribute_group rbd_bus_group = {
498 .attrs = rbd_bus_attrs,
499 .is_visible = rbd_bus_is_visible,
501 __ATTRIBUTE_GROUPS(rbd_bus);
503 static struct bus_type rbd_bus_type = {
505 .bus_groups = rbd_bus_groups,
508 static void rbd_root_dev_release(struct device *dev)
512 static struct device rbd_root_dev = {
514 .release = rbd_root_dev_release,
517 static __printf(2, 3)
518 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
520 struct va_format vaf;
528 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
529 else if (rbd_dev->disk)
530 printk(KERN_WARNING "%s: %s: %pV\n",
531 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
532 else if (rbd_dev->spec && rbd_dev->spec->image_name)
533 printk(KERN_WARNING "%s: image %s: %pV\n",
534 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
535 else if (rbd_dev->spec && rbd_dev->spec->image_id)
536 printk(KERN_WARNING "%s: id %s: %pV\n",
537 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
539 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
540 RBD_DRV_NAME, rbd_dev, &vaf);
545 #define rbd_assert(expr) \
546 if (unlikely(!(expr))) { \
547 printk(KERN_ERR "\nAssertion failure in %s() " \
549 "\trbd_assert(%s);\n\n", \
550 __func__, __LINE__, #expr); \
553 #else /* !RBD_DEBUG */
554 # define rbd_assert(expr) ((void) 0)
555 #endif /* !RBD_DEBUG */
557 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
559 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
560 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
561 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
562 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
563 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
565 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
566 u8 *order, u64 *snap_size);
567 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
570 static int rbd_open(struct block_device *bdev, fmode_t mode)
572 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
573 bool removing = false;
575 spin_lock_irq(&rbd_dev->lock);
576 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
579 rbd_dev->open_count++;
580 spin_unlock_irq(&rbd_dev->lock);
584 (void) get_device(&rbd_dev->dev);
589 static void rbd_release(struct gendisk *disk, fmode_t mode)
591 struct rbd_device *rbd_dev = disk->private_data;
592 unsigned long open_count_before;
594 spin_lock_irq(&rbd_dev->lock);
595 open_count_before = rbd_dev->open_count--;
596 spin_unlock_irq(&rbd_dev->lock);
597 rbd_assert(open_count_before > 0);
599 put_device(&rbd_dev->dev);
602 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
606 if (get_user(ro, (int __user *)arg))
609 /* Snapshots can't be marked read-write */
610 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
613 /* Let blkdev_roset() handle it */
617 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
618 unsigned int cmd, unsigned long arg)
620 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
625 ret = rbd_ioctl_set_ro(rbd_dev, arg);
635 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
636 unsigned int cmd, unsigned long arg)
638 return rbd_ioctl(bdev, mode, cmd, arg);
640 #endif /* CONFIG_COMPAT */
642 static const struct block_device_operations rbd_bd_ops = {
643 .owner = THIS_MODULE,
645 .release = rbd_release,
648 .compat_ioctl = rbd_compat_ioctl,
653 * Initialize an rbd client instance. Success or not, this function
654 * consumes ceph_opts. Caller holds client_mutex.
656 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
658 struct rbd_client *rbdc;
661 dout("%s:\n", __func__);
662 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
666 kref_init(&rbdc->kref);
667 INIT_LIST_HEAD(&rbdc->node);
669 rbdc->client = ceph_create_client(ceph_opts, rbdc);
670 if (IS_ERR(rbdc->client))
672 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
674 ret = ceph_open_session(rbdc->client);
678 spin_lock(&rbd_client_list_lock);
679 list_add_tail(&rbdc->node, &rbd_client_list);
680 spin_unlock(&rbd_client_list_lock);
682 dout("%s: rbdc %p\n", __func__, rbdc);
686 ceph_destroy_client(rbdc->client);
691 ceph_destroy_options(ceph_opts);
692 dout("%s: error %d\n", __func__, ret);
697 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
699 kref_get(&rbdc->kref);
705 * Find a ceph client with specific addr and configuration. If
706 * found, bump its reference count.
708 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
710 struct rbd_client *client_node;
713 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
716 spin_lock(&rbd_client_list_lock);
717 list_for_each_entry(client_node, &rbd_client_list, node) {
718 if (!ceph_compare_options(ceph_opts, client_node->client)) {
719 __rbd_get_client(client_node);
725 spin_unlock(&rbd_client_list_lock);
727 return found ? client_node : NULL;
731 * (Per device) rbd map options
738 /* string args above */
746 static match_table_t rbd_opts_tokens = {
747 {Opt_queue_depth, "queue_depth=%d"},
749 /* string args above */
750 {Opt_read_only, "read_only"},
751 {Opt_read_only, "ro"}, /* Alternate spelling */
752 {Opt_read_write, "read_write"},
753 {Opt_read_write, "rw"}, /* Alternate spelling */
754 {Opt_lock_on_read, "lock_on_read"},
755 {Opt_exclusive, "exclusive"},
766 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
767 #define RBD_READ_ONLY_DEFAULT false
768 #define RBD_LOCK_ON_READ_DEFAULT false
769 #define RBD_EXCLUSIVE_DEFAULT false
771 static int parse_rbd_opts_token(char *c, void *private)
773 struct rbd_options *rbd_opts = private;
774 substring_t argstr[MAX_OPT_ARGS];
775 int token, intval, ret;
777 token = match_token(c, rbd_opts_tokens, argstr);
778 if (token < Opt_last_int) {
779 ret = match_int(&argstr[0], &intval);
781 pr_err("bad mount option arg (not int) at '%s'\n", c);
784 dout("got int token %d val %d\n", token, intval);
785 } else if (token > Opt_last_int && token < Opt_last_string) {
786 dout("got string token %d val %s\n", token, argstr[0].from);
788 dout("got token %d\n", token);
792 case Opt_queue_depth:
794 pr_err("queue_depth out of range\n");
797 rbd_opts->queue_depth = intval;
800 rbd_opts->read_only = true;
803 rbd_opts->read_only = false;
805 case Opt_lock_on_read:
806 rbd_opts->lock_on_read = true;
809 rbd_opts->exclusive = true;
812 /* libceph prints "bad option" msg */
819 static char* obj_op_name(enum obj_operation_type op_type)
834 * Destroy ceph client
836 * Caller must hold rbd_client_list_lock.
838 static void rbd_client_release(struct kref *kref)
840 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
842 dout("%s: rbdc %p\n", __func__, rbdc);
843 spin_lock(&rbd_client_list_lock);
844 list_del(&rbdc->node);
845 spin_unlock(&rbd_client_list_lock);
847 ceph_destroy_client(rbdc->client);
852 * Drop reference to ceph client node. If it's not referenced anymore, release
855 static void rbd_put_client(struct rbd_client *rbdc)
858 kref_put(&rbdc->kref, rbd_client_release);
861 static int wait_for_latest_osdmap(struct ceph_client *client)
866 ret = ceph_monc_get_version(&client->monc, "osdmap", &newest_epoch);
870 if (client->osdc.osdmap->epoch >= newest_epoch)
873 ceph_osdc_maybe_request_map(&client->osdc);
874 return ceph_monc_wait_osdmap(&client->monc, newest_epoch,
875 client->options->mount_timeout);
879 * Get a ceph client with specific addr and configuration, if one does
880 * not exist create it. Either way, ceph_opts is consumed by this
883 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
885 struct rbd_client *rbdc;
888 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
889 rbdc = rbd_client_find(ceph_opts);
891 ceph_destroy_options(ceph_opts);
894 * Using an existing client. Make sure ->pg_pools is up to
895 * date before we look up the pool id in do_rbd_add().
897 ret = wait_for_latest_osdmap(rbdc->client);
899 rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
900 rbd_put_client(rbdc);
904 rbdc = rbd_client_create(ceph_opts);
906 mutex_unlock(&client_mutex);
911 static bool rbd_image_format_valid(u32 image_format)
913 return image_format == 1 || image_format == 2;
916 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
921 /* The header has to start with the magic rbd header text */
922 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
925 /* The bio layer requires at least sector-sized I/O */
927 if (ondisk->options.order < SECTOR_SHIFT)
930 /* If we use u64 in a few spots we may be able to loosen this */
932 if (ondisk->options.order > 8 * sizeof (int) - 1)
936 * The size of a snapshot header has to fit in a size_t, and
937 * that limits the number of snapshots.
939 snap_count = le32_to_cpu(ondisk->snap_count);
940 size = SIZE_MAX - sizeof (struct ceph_snap_context);
941 if (snap_count > size / sizeof (__le64))
945 * Not only that, but the size of the entire the snapshot
946 * header must also be representable in a size_t.
948 size -= snap_count * sizeof (__le64);
949 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
956 * returns the size of an object in the image
958 static u32 rbd_obj_bytes(struct rbd_image_header *header)
960 return 1U << header->obj_order;
963 static void rbd_init_layout(struct rbd_device *rbd_dev)
965 if (rbd_dev->header.stripe_unit == 0 ||
966 rbd_dev->header.stripe_count == 0) {
967 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
968 rbd_dev->header.stripe_count = 1;
971 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
972 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
973 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
974 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
975 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
976 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
980 * Fill an rbd image header with information from the given format 1
983 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
984 struct rbd_image_header_ondisk *ondisk)
986 struct rbd_image_header *header = &rbd_dev->header;
987 bool first_time = header->object_prefix == NULL;
988 struct ceph_snap_context *snapc;
989 char *object_prefix = NULL;
990 char *snap_names = NULL;
991 u64 *snap_sizes = NULL;
996 /* Allocate this now to avoid having to handle failure below */
999 object_prefix = kstrndup(ondisk->object_prefix,
1000 sizeof(ondisk->object_prefix),
1006 /* Allocate the snapshot context and fill it in */
1008 snap_count = le32_to_cpu(ondisk->snap_count);
1009 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1012 snapc->seq = le64_to_cpu(ondisk->snap_seq);
1014 struct rbd_image_snap_ondisk *snaps;
1015 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1017 /* We'll keep a copy of the snapshot names... */
1019 if (snap_names_len > (u64)SIZE_MAX)
1021 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1025 /* ...as well as the array of their sizes. */
1026 snap_sizes = kmalloc_array(snap_count,
1027 sizeof(*header->snap_sizes),
1033 * Copy the names, and fill in each snapshot's id
1036 * Note that rbd_dev_v1_header_info() guarantees the
1037 * ondisk buffer we're working with has
1038 * snap_names_len bytes beyond the end of the
1039 * snapshot id array, this memcpy() is safe.
1041 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1042 snaps = ondisk->snaps;
1043 for (i = 0; i < snap_count; i++) {
1044 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1045 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1049 /* We won't fail any more, fill in the header */
1052 header->object_prefix = object_prefix;
1053 header->obj_order = ondisk->options.order;
1054 rbd_init_layout(rbd_dev);
1056 ceph_put_snap_context(header->snapc);
1057 kfree(header->snap_names);
1058 kfree(header->snap_sizes);
1061 /* The remaining fields always get updated (when we refresh) */
1063 header->image_size = le64_to_cpu(ondisk->image_size);
1064 header->snapc = snapc;
1065 header->snap_names = snap_names;
1066 header->snap_sizes = snap_sizes;
1074 ceph_put_snap_context(snapc);
1075 kfree(object_prefix);
1080 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1082 const char *snap_name;
1084 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1086 /* Skip over names until we find the one we are looking for */
1088 snap_name = rbd_dev->header.snap_names;
1090 snap_name += strlen(snap_name) + 1;
1092 return kstrdup(snap_name, GFP_KERNEL);
1096 * Snapshot id comparison function for use with qsort()/bsearch().
1097 * Note that result is for snapshots in *descending* order.
1099 static int snapid_compare_reverse(const void *s1, const void *s2)
1101 u64 snap_id1 = *(u64 *)s1;
1102 u64 snap_id2 = *(u64 *)s2;
1104 if (snap_id1 < snap_id2)
1106 return snap_id1 == snap_id2 ? 0 : -1;
1110 * Search a snapshot context to see if the given snapshot id is
1113 * Returns the position of the snapshot id in the array if it's found,
1114 * or BAD_SNAP_INDEX otherwise.
1116 * Note: The snapshot array is in kept sorted (by the osd) in
1117 * reverse order, highest snapshot id first.
1119 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1121 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1124 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1125 sizeof (snap_id), snapid_compare_reverse);
1127 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1130 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1134 const char *snap_name;
1136 which = rbd_dev_snap_index(rbd_dev, snap_id);
1137 if (which == BAD_SNAP_INDEX)
1138 return ERR_PTR(-ENOENT);
1140 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1141 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1144 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1146 if (snap_id == CEPH_NOSNAP)
1147 return RBD_SNAP_HEAD_NAME;
1149 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1150 if (rbd_dev->image_format == 1)
1151 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1153 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1156 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1159 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1160 if (snap_id == CEPH_NOSNAP) {
1161 *snap_size = rbd_dev->header.image_size;
1162 } else if (rbd_dev->image_format == 1) {
1165 which = rbd_dev_snap_index(rbd_dev, snap_id);
1166 if (which == BAD_SNAP_INDEX)
1169 *snap_size = rbd_dev->header.snap_sizes[which];
1174 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1183 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1186 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1187 if (snap_id == CEPH_NOSNAP) {
1188 *snap_features = rbd_dev->header.features;
1189 } else if (rbd_dev->image_format == 1) {
1190 *snap_features = 0; /* No features for format 1 */
1195 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1199 *snap_features = features;
1204 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1206 u64 snap_id = rbd_dev->spec->snap_id;
1211 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1214 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1218 rbd_dev->mapping.size = size;
1219 rbd_dev->mapping.features = features;
1224 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1226 rbd_dev->mapping.size = 0;
1227 rbd_dev->mapping.features = 0;
1230 static void zero_bvec(struct bio_vec *bv)
1233 unsigned long flags;
1235 buf = bvec_kmap_irq(bv, &flags);
1236 memset(buf, 0, bv->bv_len);
1237 flush_dcache_page(bv->bv_page);
1238 bvec_kunmap_irq(buf, &flags);
1241 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1243 struct ceph_bio_iter it = *bio_pos;
1245 ceph_bio_iter_advance(&it, off);
1246 ceph_bio_iter_advance_step(&it, bytes, ({
1251 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1253 struct ceph_bvec_iter it = *bvec_pos;
1255 ceph_bvec_iter_advance(&it, off);
1256 ceph_bvec_iter_advance_step(&it, bytes, ({
1262 * Zero a range in @obj_req data buffer defined by a bio (list) or
1263 * (private) bio_vec array.
1265 * @off is relative to the start of the data buffer.
1267 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1270 switch (obj_req->img_request->data_type) {
1271 case OBJ_REQUEST_BIO:
1272 zero_bios(&obj_req->bio_pos, off, bytes);
1274 case OBJ_REQUEST_BVECS:
1275 case OBJ_REQUEST_OWN_BVECS:
1276 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1283 static void rbd_obj_request_destroy(struct kref *kref);
1284 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1286 rbd_assert(obj_request != NULL);
1287 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1288 kref_read(&obj_request->kref));
1289 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1292 static void rbd_img_request_get(struct rbd_img_request *img_request)
1294 dout("%s: img %p (was %d)\n", __func__, img_request,
1295 kref_read(&img_request->kref));
1296 kref_get(&img_request->kref);
1299 static void rbd_img_request_destroy(struct kref *kref);
1300 static void rbd_img_request_put(struct rbd_img_request *img_request)
1302 rbd_assert(img_request != NULL);
1303 dout("%s: img %p (was %d)\n", __func__, img_request,
1304 kref_read(&img_request->kref));
1305 kref_put(&img_request->kref, rbd_img_request_destroy);
1308 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1309 struct rbd_obj_request *obj_request)
1311 rbd_assert(obj_request->img_request == NULL);
1313 /* Image request now owns object's original reference */
1314 obj_request->img_request = img_request;
1315 img_request->obj_request_count++;
1316 img_request->pending_count++;
1317 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1320 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1321 struct rbd_obj_request *obj_request)
1323 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1324 list_del(&obj_request->ex.oe_item);
1325 rbd_assert(img_request->obj_request_count > 0);
1326 img_request->obj_request_count--;
1327 rbd_assert(obj_request->img_request == img_request);
1328 rbd_obj_request_put(obj_request);
1331 static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
1333 struct ceph_osd_request *osd_req = obj_request->osd_req;
1335 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1336 obj_request, obj_request->ex.oe_objno, obj_request->ex.oe_off,
1337 obj_request->ex.oe_len, osd_req);
1338 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1342 * The default/initial value for all image request flags is 0. Each
1343 * is conditionally set to 1 at image request initialization time
1344 * and currently never change thereafter.
1346 static void img_request_layered_set(struct rbd_img_request *img_request)
1348 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1352 static void img_request_layered_clear(struct rbd_img_request *img_request)
1354 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1358 static bool img_request_layered_test(struct rbd_img_request *img_request)
1361 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1364 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1366 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1368 return !obj_req->ex.oe_off &&
1369 obj_req->ex.oe_len == rbd_dev->layout.object_size;
1372 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1374 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1376 return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1377 rbd_dev->layout.object_size;
1380 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1382 return ceph_file_extents_bytes(obj_req->img_extents,
1383 obj_req->num_img_extents);
1386 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1388 switch (img_req->op_type) {
1392 case OBJ_OP_DISCARD:
1399 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
1401 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1403 struct rbd_obj_request *obj_req = osd_req->r_priv;
1405 dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1406 osd_req->r_result, obj_req);
1407 rbd_assert(osd_req == obj_req->osd_req);
1409 obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
1410 if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
1411 obj_req->xferred = osd_req->r_result;
1414 * Writes aren't allowed to return a data payload. In some
1415 * guarded write cases (e.g. stat + zero on an empty object)
1416 * a stat response makes it through, but we don't care.
1418 obj_req->xferred = 0;
1420 rbd_obj_handle_request(obj_req);
1423 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1425 struct ceph_osd_request *osd_req = obj_request->osd_req;
1427 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1428 osd_req->r_snapid = obj_request->img_request->snap_id;
1431 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1433 struct ceph_osd_request *osd_req = obj_request->osd_req;
1435 osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1436 ktime_get_real_ts(&osd_req->r_mtime);
1437 osd_req->r_data_offset = obj_request->ex.oe_off;
1440 static struct ceph_osd_request *
1441 rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
1443 struct rbd_img_request *img_req = obj_req->img_request;
1444 struct rbd_device *rbd_dev = img_req->rbd_dev;
1445 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1446 struct ceph_osd_request *req;
1447 const char *name_format = rbd_dev->image_format == 1 ?
1448 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1450 req = ceph_osdc_alloc_request(osdc,
1451 (rbd_img_is_write(img_req) ? img_req->snapc : NULL),
1452 num_ops, false, GFP_NOIO);
1456 req->r_callback = rbd_osd_req_callback;
1457 req->r_priv = obj_req;
1459 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1460 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1461 rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
1464 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1470 ceph_osdc_put_request(req);
1474 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1476 ceph_osdc_put_request(osd_req);
1479 static struct rbd_obj_request *rbd_obj_request_create(void)
1481 struct rbd_obj_request *obj_request;
1483 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1487 ceph_object_extent_init(&obj_request->ex);
1488 kref_init(&obj_request->kref);
1490 dout("%s %p\n", __func__, obj_request);
1494 static void rbd_obj_request_destroy(struct kref *kref)
1496 struct rbd_obj_request *obj_request;
1499 obj_request = container_of(kref, struct rbd_obj_request, kref);
1501 dout("%s: obj %p\n", __func__, obj_request);
1503 if (obj_request->osd_req)
1504 rbd_osd_req_destroy(obj_request->osd_req);
1506 switch (obj_request->img_request->data_type) {
1507 case OBJ_REQUEST_NODATA:
1508 case OBJ_REQUEST_BIO:
1509 case OBJ_REQUEST_BVECS:
1510 break; /* Nothing to do */
1511 case OBJ_REQUEST_OWN_BVECS:
1512 kfree(obj_request->bvec_pos.bvecs);
1518 kfree(obj_request->img_extents);
1519 if (obj_request->copyup_bvecs) {
1520 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1521 if (obj_request->copyup_bvecs[i].bv_page)
1522 __free_page(obj_request->copyup_bvecs[i].bv_page);
1524 kfree(obj_request->copyup_bvecs);
1527 kmem_cache_free(rbd_obj_request_cache, obj_request);
1530 /* It's OK to call this for a device with no parent */
1532 static void rbd_spec_put(struct rbd_spec *spec);
1533 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1535 rbd_dev_remove_parent(rbd_dev);
1536 rbd_spec_put(rbd_dev->parent_spec);
1537 rbd_dev->parent_spec = NULL;
1538 rbd_dev->parent_overlap = 0;
1542 * Parent image reference counting is used to determine when an
1543 * image's parent fields can be safely torn down--after there are no
1544 * more in-flight requests to the parent image. When the last
1545 * reference is dropped, cleaning them up is safe.
1547 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1551 if (!rbd_dev->parent_spec)
1554 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1558 /* Last reference; clean up parent data structures */
1561 rbd_dev_unparent(rbd_dev);
1563 rbd_warn(rbd_dev, "parent reference underflow");
1567 * If an image has a non-zero parent overlap, get a reference to its
1570 * Returns true if the rbd device has a parent with a non-zero
1571 * overlap and a reference for it was successfully taken, or
1574 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1578 if (!rbd_dev->parent_spec)
1581 down_read(&rbd_dev->header_rwsem);
1582 if (rbd_dev->parent_overlap)
1583 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1584 up_read(&rbd_dev->header_rwsem);
1587 rbd_warn(rbd_dev, "parent reference overflow");
1593 * Caller is responsible for filling in the list of object requests
1594 * that comprises the image request, and the Linux request pointer
1595 * (if there is one).
1597 static struct rbd_img_request *rbd_img_request_create(
1598 struct rbd_device *rbd_dev,
1599 enum obj_operation_type op_type,
1600 struct ceph_snap_context *snapc)
1602 struct rbd_img_request *img_request;
1604 img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1608 img_request->rbd_dev = rbd_dev;
1609 img_request->op_type = op_type;
1610 if (!rbd_img_is_write(img_request))
1611 img_request->snap_id = rbd_dev->spec->snap_id;
1613 img_request->snapc = snapc;
1615 if (rbd_dev_parent_get(rbd_dev))
1616 img_request_layered_set(img_request);
1618 spin_lock_init(&img_request->completion_lock);
1619 INIT_LIST_HEAD(&img_request->object_extents);
1620 kref_init(&img_request->kref);
1622 dout("%s: rbd_dev %p %s -> img %p\n", __func__, rbd_dev,
1623 obj_op_name(op_type), img_request);
1627 static void rbd_img_request_destroy(struct kref *kref)
1629 struct rbd_img_request *img_request;
1630 struct rbd_obj_request *obj_request;
1631 struct rbd_obj_request *next_obj_request;
1633 img_request = container_of(kref, struct rbd_img_request, kref);
1635 dout("%s: img %p\n", __func__, img_request);
1637 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1638 rbd_img_obj_request_del(img_request, obj_request);
1639 rbd_assert(img_request->obj_request_count == 0);
1641 if (img_request_layered_test(img_request)) {
1642 img_request_layered_clear(img_request);
1643 rbd_dev_parent_put(img_request->rbd_dev);
1646 if (rbd_img_is_write(img_request))
1647 ceph_put_snap_context(img_request->snapc);
1649 kmem_cache_free(rbd_img_request_cache, img_request);
1652 static void prune_extents(struct ceph_file_extent *img_extents,
1653 u32 *num_img_extents, u64 overlap)
1655 u32 cnt = *num_img_extents;
1657 /* drop extents completely beyond the overlap */
1658 while (cnt && img_extents[cnt - 1].fe_off >= overlap)
1662 struct ceph_file_extent *ex = &img_extents[cnt - 1];
1664 /* trim final overlapping extent */
1665 if (ex->fe_off + ex->fe_len > overlap)
1666 ex->fe_len = overlap - ex->fe_off;
1669 *num_img_extents = cnt;
1673 * Determine the byte range(s) covered by either just the object extent
1674 * or the entire object in the parent image.
1676 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
1679 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1682 if (!rbd_dev->parent_overlap)
1685 ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
1686 entire ? 0 : obj_req->ex.oe_off,
1687 entire ? rbd_dev->layout.object_size :
1689 &obj_req->img_extents,
1690 &obj_req->num_img_extents);
1694 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
1695 rbd_dev->parent_overlap);
1699 static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
1701 switch (obj_req->img_request->data_type) {
1702 case OBJ_REQUEST_BIO:
1703 osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
1705 obj_req->ex.oe_len);
1707 case OBJ_REQUEST_BVECS:
1708 case OBJ_REQUEST_OWN_BVECS:
1709 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
1710 obj_req->ex.oe_len);
1711 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
1712 osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
1713 &obj_req->bvec_pos);
1720 static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
1722 obj_req->osd_req = rbd_osd_req_create(obj_req, 1);
1723 if (!obj_req->osd_req)
1726 osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
1727 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1728 rbd_osd_req_setup_data(obj_req, 0);
1730 rbd_osd_req_format_read(obj_req);
1734 static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
1737 struct page **pages;
1740 * The response data for a STAT call consists of:
1747 pages = ceph_alloc_page_vector(1, GFP_NOIO);
1749 return PTR_ERR(pages);
1751 osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
1752 osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
1753 8 + sizeof(struct ceph_timespec),
1758 static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
1761 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1764 osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
1765 rbd_dev->layout.object_size,
1766 rbd_dev->layout.object_size);
1768 if (rbd_obj_is_entire(obj_req))
1769 opcode = CEPH_OSD_OP_WRITEFULL;
1771 opcode = CEPH_OSD_OP_WRITE;
1773 osd_req_op_extent_init(obj_req->osd_req, which, opcode,
1774 obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
1775 rbd_osd_req_setup_data(obj_req, which++);
1777 rbd_assert(which == obj_req->osd_req->r_num_ops);
1778 rbd_osd_req_format_write(obj_req);
1781 static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
1783 unsigned int num_osd_ops, which = 0;
1786 /* reverse map the entire object onto the parent */
1787 ret = rbd_obj_calc_img_extents(obj_req, true);
1791 if (obj_req->num_img_extents) {
1792 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1793 num_osd_ops = 3; /* stat + setallochint + write/writefull */
1795 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1796 num_osd_ops = 2; /* setallochint + write/writefull */
1799 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1800 if (!obj_req->osd_req)
1803 if (obj_req->num_img_extents) {
1804 ret = __rbd_obj_setup_stat(obj_req, which++);
1809 __rbd_obj_setup_write(obj_req, which);
1813 static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
1818 if (rbd_obj_is_entire(obj_req)) {
1819 if (obj_req->num_img_extents) {
1820 osd_req_op_init(obj_req->osd_req, which++,
1821 CEPH_OSD_OP_CREATE, 0);
1822 opcode = CEPH_OSD_OP_TRUNCATE;
1824 osd_req_op_init(obj_req->osd_req, which++,
1825 CEPH_OSD_OP_DELETE, 0);
1828 } else if (rbd_obj_is_tail(obj_req)) {
1829 opcode = CEPH_OSD_OP_TRUNCATE;
1831 opcode = CEPH_OSD_OP_ZERO;
1835 osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
1836 obj_req->ex.oe_off, obj_req->ex.oe_len,
1839 rbd_assert(which == obj_req->osd_req->r_num_ops);
1840 rbd_osd_req_format_write(obj_req);
1843 static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
1845 unsigned int num_osd_ops, which = 0;
1848 /* reverse map the entire object onto the parent */
1849 ret = rbd_obj_calc_img_extents(obj_req, true);
1853 if (rbd_obj_is_entire(obj_req)) {
1854 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1855 if (obj_req->num_img_extents)
1856 num_osd_ops = 2; /* create + truncate */
1858 num_osd_ops = 1; /* delete */
1860 if (obj_req->num_img_extents) {
1861 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
1862 num_osd_ops = 2; /* stat + truncate/zero */
1864 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
1865 num_osd_ops = 1; /* truncate/zero */
1869 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
1870 if (!obj_req->osd_req)
1873 if (!rbd_obj_is_entire(obj_req) && obj_req->num_img_extents) {
1874 ret = __rbd_obj_setup_stat(obj_req, which++);
1879 __rbd_obj_setup_discard(obj_req, which);
1884 * For each object request in @img_req, allocate an OSD request, add
1885 * individual OSD ops and prepare them for submission. The number of
1886 * OSD ops depends on op_type and the overlap point (if any).
1888 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
1890 struct rbd_obj_request *obj_req;
1893 for_each_obj_request(img_req, obj_req) {
1894 switch (img_req->op_type) {
1896 ret = rbd_obj_setup_read(obj_req);
1899 ret = rbd_obj_setup_write(obj_req);
1901 case OBJ_OP_DISCARD:
1902 ret = rbd_obj_setup_discard(obj_req);
1914 union rbd_img_fill_iter {
1915 struct ceph_bio_iter bio_iter;
1916 struct ceph_bvec_iter bvec_iter;
1919 struct rbd_img_fill_ctx {
1920 enum obj_request_type pos_type;
1921 union rbd_img_fill_iter *pos;
1922 union rbd_img_fill_iter iter;
1923 ceph_object_extent_fn_t set_pos_fn;
1924 ceph_object_extent_fn_t count_fn;
1925 ceph_object_extent_fn_t copy_fn;
1928 static struct ceph_object_extent *alloc_object_extent(void *arg)
1930 struct rbd_img_request *img_req = arg;
1931 struct rbd_obj_request *obj_req;
1933 obj_req = rbd_obj_request_create();
1937 rbd_img_obj_request_add(img_req, obj_req);
1938 return &obj_req->ex;
1942 * While su != os && sc == 1 is technically not fancy (it's the same
1943 * layout as su == os && sc == 1), we can't use the nocopy path for it
1944 * because ->set_pos_fn() should be called only once per object.
1945 * ceph_file_to_extents() invokes action_fn once per stripe unit, so
1946 * treat su != os && sc == 1 as fancy.
1948 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
1950 return l->stripe_unit != l->object_size;
1953 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
1954 struct ceph_file_extent *img_extents,
1955 u32 num_img_extents,
1956 struct rbd_img_fill_ctx *fctx)
1961 img_req->data_type = fctx->pos_type;
1964 * Create object requests and set each object request's starting
1965 * position in the provided bio (list) or bio_vec array.
1967 fctx->iter = *fctx->pos;
1968 for (i = 0; i < num_img_extents; i++) {
1969 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
1970 img_extents[i].fe_off,
1971 img_extents[i].fe_len,
1972 &img_req->object_extents,
1973 alloc_object_extent, img_req,
1974 fctx->set_pos_fn, &fctx->iter);
1979 return __rbd_img_fill_request(img_req);
1983 * Map a list of image extents to a list of object extents, create the
1984 * corresponding object requests (normally each to a different object,
1985 * but not always) and add them to @img_req. For each object request,
1986 * set up its data descriptor to point to the corresponding chunk(s) of
1987 * @fctx->pos data buffer.
1989 * Because ceph_file_to_extents() will merge adjacent object extents
1990 * together, each object request's data descriptor may point to multiple
1991 * different chunks of @fctx->pos data buffer.
1993 * @fctx->pos data buffer is assumed to be large enough.
1995 static int rbd_img_fill_request(struct rbd_img_request *img_req,
1996 struct ceph_file_extent *img_extents,
1997 u32 num_img_extents,
1998 struct rbd_img_fill_ctx *fctx)
2000 struct rbd_device *rbd_dev = img_req->rbd_dev;
2001 struct rbd_obj_request *obj_req;
2005 if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2006 !rbd_layout_is_fancy(&rbd_dev->layout))
2007 return rbd_img_fill_request_nocopy(img_req, img_extents,
2008 num_img_extents, fctx);
2010 img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2013 * Create object requests and determine ->bvec_count for each object
2014 * request. Note that ->bvec_count sum over all object requests may
2015 * be greater than the number of bio_vecs in the provided bio (list)
2016 * or bio_vec array because when mapped, those bio_vecs can straddle
2017 * stripe unit boundaries.
2019 fctx->iter = *fctx->pos;
2020 for (i = 0; i < num_img_extents; i++) {
2021 ret = ceph_file_to_extents(&rbd_dev->layout,
2022 img_extents[i].fe_off,
2023 img_extents[i].fe_len,
2024 &img_req->object_extents,
2025 alloc_object_extent, img_req,
2026 fctx->count_fn, &fctx->iter);
2031 for_each_obj_request(img_req, obj_req) {
2032 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2033 sizeof(*obj_req->bvec_pos.bvecs),
2035 if (!obj_req->bvec_pos.bvecs)
2040 * Fill in each object request's private bio_vec array, splitting and
2041 * rearranging the provided bio_vecs in stripe unit chunks as needed.
2043 fctx->iter = *fctx->pos;
2044 for (i = 0; i < num_img_extents; i++) {
2045 ret = ceph_iterate_extents(&rbd_dev->layout,
2046 img_extents[i].fe_off,
2047 img_extents[i].fe_len,
2048 &img_req->object_extents,
2049 fctx->copy_fn, &fctx->iter);
2054 return __rbd_img_fill_request(img_req);
2057 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2060 struct ceph_file_extent ex = { off, len };
2061 union rbd_img_fill_iter dummy;
2062 struct rbd_img_fill_ctx fctx = {
2063 .pos_type = OBJ_REQUEST_NODATA,
2067 return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2070 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2072 struct rbd_obj_request *obj_req =
2073 container_of(ex, struct rbd_obj_request, ex);
2074 struct ceph_bio_iter *it = arg;
2076 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2077 obj_req->bio_pos = *it;
2078 ceph_bio_iter_advance(it, bytes);
2081 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2083 struct rbd_obj_request *obj_req =
2084 container_of(ex, struct rbd_obj_request, ex);
2085 struct ceph_bio_iter *it = arg;
2087 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2088 ceph_bio_iter_advance_step(it, bytes, ({
2089 obj_req->bvec_count++;
2094 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2096 struct rbd_obj_request *obj_req =
2097 container_of(ex, struct rbd_obj_request, ex);
2098 struct ceph_bio_iter *it = arg;
2100 dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2101 ceph_bio_iter_advance_step(it, bytes, ({
2102 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2103 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2107 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2108 struct ceph_file_extent *img_extents,
2109 u32 num_img_extents,
2110 struct ceph_bio_iter *bio_pos)
2112 struct rbd_img_fill_ctx fctx = {
2113 .pos_type = OBJ_REQUEST_BIO,
2114 .pos = (union rbd_img_fill_iter *)bio_pos,
2115 .set_pos_fn = set_bio_pos,
2116 .count_fn = count_bio_bvecs,
2117 .copy_fn = copy_bio_bvecs,
2120 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2124 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2125 u64 off, u64 len, struct bio *bio)
2127 struct ceph_file_extent ex = { off, len };
2128 struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2130 return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2133 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2135 struct rbd_obj_request *obj_req =
2136 container_of(ex, struct rbd_obj_request, ex);
2137 struct ceph_bvec_iter *it = arg;
2139 obj_req->bvec_pos = *it;
2140 ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2141 ceph_bvec_iter_advance(it, bytes);
2144 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2146 struct rbd_obj_request *obj_req =
2147 container_of(ex, struct rbd_obj_request, ex);
2148 struct ceph_bvec_iter *it = arg;
2150 ceph_bvec_iter_advance_step(it, bytes, ({
2151 obj_req->bvec_count++;
2155 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2157 struct rbd_obj_request *obj_req =
2158 container_of(ex, struct rbd_obj_request, ex);
2159 struct ceph_bvec_iter *it = arg;
2161 ceph_bvec_iter_advance_step(it, bytes, ({
2162 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2163 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2167 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2168 struct ceph_file_extent *img_extents,
2169 u32 num_img_extents,
2170 struct ceph_bvec_iter *bvec_pos)
2172 struct rbd_img_fill_ctx fctx = {
2173 .pos_type = OBJ_REQUEST_BVECS,
2174 .pos = (union rbd_img_fill_iter *)bvec_pos,
2175 .set_pos_fn = set_bvec_pos,
2176 .count_fn = count_bvecs,
2177 .copy_fn = copy_bvecs,
2180 return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2184 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2185 struct ceph_file_extent *img_extents,
2186 u32 num_img_extents,
2187 struct bio_vec *bvecs)
2189 struct ceph_bvec_iter it = {
2191 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2195 return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2199 static void rbd_img_request_submit(struct rbd_img_request *img_request)
2201 struct rbd_obj_request *obj_request;
2203 dout("%s: img %p\n", __func__, img_request);
2205 rbd_img_request_get(img_request);
2206 for_each_obj_request(img_request, obj_request)
2207 rbd_obj_request_submit(obj_request);
2209 rbd_img_request_put(img_request);
2212 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2214 struct rbd_img_request *img_req = obj_req->img_request;
2215 struct rbd_img_request *child_img_req;
2218 child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2223 __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2224 child_img_req->obj_request = obj_req;
2226 if (!rbd_img_is_write(img_req)) {
2227 switch (img_req->data_type) {
2228 case OBJ_REQUEST_BIO:
2229 ret = __rbd_img_fill_from_bio(child_img_req,
2230 obj_req->img_extents,
2231 obj_req->num_img_extents,
2234 case OBJ_REQUEST_BVECS:
2235 case OBJ_REQUEST_OWN_BVECS:
2236 ret = __rbd_img_fill_from_bvecs(child_img_req,
2237 obj_req->img_extents,
2238 obj_req->num_img_extents,
2239 &obj_req->bvec_pos);
2245 ret = rbd_img_fill_from_bvecs(child_img_req,
2246 obj_req->img_extents,
2247 obj_req->num_img_extents,
2248 obj_req->copyup_bvecs);
2251 rbd_img_request_put(child_img_req);
2255 rbd_img_request_submit(child_img_req);
2259 static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
2261 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2264 if (obj_req->result == -ENOENT &&
2265 rbd_dev->parent_overlap && !obj_req->tried_parent) {
2266 /* reverse map this object extent onto the parent */
2267 ret = rbd_obj_calc_img_extents(obj_req, false);
2269 obj_req->result = ret;
2273 if (obj_req->num_img_extents) {
2274 obj_req->tried_parent = true;
2275 ret = rbd_obj_read_from_parent(obj_req);
2277 obj_req->result = ret;
2285 * -ENOENT means a hole in the image -- zero-fill the entire
2286 * length of the request. A short read also implies zero-fill
2287 * to the end of the request. In both cases we update xferred
2288 * count to indicate the whole request was satisfied.
2290 if (obj_req->result == -ENOENT ||
2291 (!obj_req->result && obj_req->xferred < obj_req->ex.oe_len)) {
2292 rbd_assert(!obj_req->xferred || !obj_req->result);
2293 rbd_obj_zero_range(obj_req, obj_req->xferred,
2294 obj_req->ex.oe_len - obj_req->xferred);
2295 obj_req->result = 0;
2296 obj_req->xferred = obj_req->ex.oe_len;
2303 * copyup_bvecs pages are never highmem pages
2305 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
2307 struct ceph_bvec_iter it = {
2309 .iter = { .bi_size = bytes },
2312 ceph_bvec_iter_advance_step(&it, bytes, ({
2313 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
2320 static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
2322 unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
2324 dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
2325 rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
2326 rbd_osd_req_destroy(obj_req->osd_req);
2329 * Create a copyup request with the same number of OSD ops as
2330 * the original request. The original request was stat + op(s),
2331 * the new copyup request will be copyup + the same op(s).
2333 obj_req->osd_req = rbd_osd_req_create(obj_req, num_osd_ops);
2334 if (!obj_req->osd_req)
2338 * Only send non-zero copyup data to save some I/O and network
2339 * bandwidth -- zero copyup data is equivalent to the object not
2342 if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
2343 dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
2347 osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
2349 osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
2350 obj_req->copyup_bvecs, bytes);
2352 switch (obj_req->img_request->op_type) {
2354 __rbd_obj_setup_write(obj_req, 1);
2356 case OBJ_OP_DISCARD:
2357 rbd_assert(!rbd_obj_is_entire(obj_req));
2358 __rbd_obj_setup_discard(obj_req, 1);
2364 rbd_obj_request_submit(obj_req);
2368 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
2372 rbd_assert(!obj_req->copyup_bvecs);
2373 obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
2374 obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
2375 sizeof(*obj_req->copyup_bvecs),
2377 if (!obj_req->copyup_bvecs)
2380 for (i = 0; i < obj_req->copyup_bvec_count; i++) {
2381 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
2383 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
2384 if (!obj_req->copyup_bvecs[i].bv_page)
2387 obj_req->copyup_bvecs[i].bv_offset = 0;
2388 obj_req->copyup_bvecs[i].bv_len = len;
2392 rbd_assert(!obj_overlap);
2396 static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
2398 struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2401 rbd_assert(obj_req->num_img_extents);
2402 prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2403 rbd_dev->parent_overlap);
2404 if (!obj_req->num_img_extents) {
2406 * The overlap has become 0 (most likely because the
2407 * image has been flattened). Use rbd_obj_issue_copyup()
2408 * to re-submit the original write request -- the copyup
2409 * operation itself will be a no-op, since someone must
2410 * have populated the child object while we weren't
2411 * looking. Move to WRITE_FLAT state as we'll be done
2412 * with the operation once the null copyup completes.
2414 obj_req->write_state = RBD_OBJ_WRITE_FLAT;
2415 return rbd_obj_issue_copyup(obj_req, 0);
2418 ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
2422 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
2423 return rbd_obj_read_from_parent(obj_req);
2426 static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
2431 switch (obj_req->write_state) {
2432 case RBD_OBJ_WRITE_GUARD:
2433 rbd_assert(!obj_req->xferred);
2434 if (obj_req->result == -ENOENT) {
2436 * The target object doesn't exist. Read the data for
2437 * the entire target object up to the overlap point (if
2438 * any) from the parent, so we can use it for a copyup.
2440 ret = rbd_obj_handle_write_guard(obj_req);
2442 obj_req->result = ret;
2448 case RBD_OBJ_WRITE_FLAT:
2449 if (!obj_req->result)
2451 * There is no such thing as a successful short
2452 * write -- indicate the whole request was satisfied.
2454 obj_req->xferred = obj_req->ex.oe_len;
2456 case RBD_OBJ_WRITE_COPYUP:
2457 obj_req->write_state = RBD_OBJ_WRITE_GUARD;
2458 if (obj_req->result)
2461 rbd_assert(obj_req->xferred);
2462 ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
2464 obj_req->result = ret;
2474 * Returns true if @obj_req is completed, or false otherwise.
2476 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2478 switch (obj_req->img_request->op_type) {
2480 return rbd_obj_handle_read(obj_req);
2482 return rbd_obj_handle_write(obj_req);
2483 case OBJ_OP_DISCARD:
2484 if (rbd_obj_handle_write(obj_req)) {
2486 * Hide -ENOENT from delete/truncate/zero -- discarding
2487 * a non-existent object is not a problem.
2489 if (obj_req->result == -ENOENT) {
2490 obj_req->result = 0;
2491 obj_req->xferred = obj_req->ex.oe_len;
2501 static void rbd_obj_end_request(struct rbd_obj_request *obj_req)
2503 struct rbd_img_request *img_req = obj_req->img_request;
2505 rbd_assert((!obj_req->result &&
2506 obj_req->xferred == obj_req->ex.oe_len) ||
2507 (obj_req->result < 0 && !obj_req->xferred));
2508 if (!obj_req->result) {
2509 img_req->xferred += obj_req->xferred;
2513 rbd_warn(img_req->rbd_dev,
2514 "%s at objno %llu %llu~%llu result %d xferred %llu",
2515 obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
2516 obj_req->ex.oe_off, obj_req->ex.oe_len, obj_req->result,
2518 if (!img_req->result) {
2519 img_req->result = obj_req->result;
2520 img_req->xferred = 0;
2524 static void rbd_img_end_child_request(struct rbd_img_request *img_req)
2526 struct rbd_obj_request *obj_req = img_req->obj_request;
2528 rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
2529 rbd_assert((!img_req->result &&
2530 img_req->xferred == rbd_obj_img_extents_bytes(obj_req)) ||
2531 (img_req->result < 0 && !img_req->xferred));
2533 obj_req->result = img_req->result;
2534 obj_req->xferred = img_req->xferred;
2535 rbd_img_request_put(img_req);
2538 static void rbd_img_end_request(struct rbd_img_request *img_req)
2540 rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
2541 rbd_assert((!img_req->result &&
2542 img_req->xferred == blk_rq_bytes(img_req->rq)) ||
2543 (img_req->result < 0 && !img_req->xferred));
2545 blk_mq_end_request(img_req->rq,
2546 errno_to_blk_status(img_req->result));
2547 rbd_img_request_put(img_req);
2550 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
2552 struct rbd_img_request *img_req;
2555 if (!__rbd_obj_handle_request(obj_req))
2558 img_req = obj_req->img_request;
2559 spin_lock(&img_req->completion_lock);
2560 rbd_obj_end_request(obj_req);
2561 rbd_assert(img_req->pending_count);
2562 if (--img_req->pending_count) {
2563 spin_unlock(&img_req->completion_lock);
2567 spin_unlock(&img_req->completion_lock);
2568 if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
2569 obj_req = img_req->obj_request;
2570 rbd_img_end_child_request(img_req);
2573 rbd_img_end_request(img_req);
2576 static const struct rbd_client_id rbd_empty_cid;
2578 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
2579 const struct rbd_client_id *rhs)
2581 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
2584 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
2586 struct rbd_client_id cid;
2588 mutex_lock(&rbd_dev->watch_mutex);
2589 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
2590 cid.handle = rbd_dev->watch_cookie;
2591 mutex_unlock(&rbd_dev->watch_mutex);
2596 * lock_rwsem must be held for write
2598 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
2599 const struct rbd_client_id *cid)
2601 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
2602 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
2603 cid->gid, cid->handle);
2604 rbd_dev->owner_cid = *cid; /* struct */
2607 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
2609 mutex_lock(&rbd_dev->watch_mutex);
2610 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
2611 mutex_unlock(&rbd_dev->watch_mutex);
2614 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
2616 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2618 strcpy(rbd_dev->lock_cookie, cookie);
2619 rbd_set_owner_cid(rbd_dev, &cid);
2620 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
2624 * lock_rwsem must be held for write
2626 static int rbd_lock(struct rbd_device *rbd_dev)
2628 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2632 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
2633 rbd_dev->lock_cookie[0] != '\0');
2635 format_lock_cookie(rbd_dev, cookie);
2636 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2637 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
2638 RBD_LOCK_TAG, "", 0);
2642 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
2643 __rbd_lock(rbd_dev, cookie);
2648 * lock_rwsem must be held for write
2650 static void rbd_unlock(struct rbd_device *rbd_dev)
2652 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2655 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
2656 rbd_dev->lock_cookie[0] == '\0');
2658 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
2659 RBD_LOCK_NAME, rbd_dev->lock_cookie);
2660 if (ret && ret != -ENOENT)
2661 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
2663 /* treat errors as the image is unlocked */
2664 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
2665 rbd_dev->lock_cookie[0] = '\0';
2666 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
2667 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
2670 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
2671 enum rbd_notify_op notify_op,
2672 struct page ***preply_pages,
2675 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2676 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
2677 char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
2678 int buf_size = sizeof(buf);
2681 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
2683 /* encode *LockPayload NotifyMessage (op + ClientId) */
2684 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
2685 ceph_encode_32(&p, notify_op);
2686 ceph_encode_64(&p, cid.gid);
2687 ceph_encode_64(&p, cid.handle);
2689 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
2690 &rbd_dev->header_oloc, buf, buf_size,
2691 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
2694 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
2695 enum rbd_notify_op notify_op)
2697 struct page **reply_pages;
2700 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
2701 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2704 static void rbd_notify_acquired_lock(struct work_struct *work)
2706 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2707 acquired_lock_work);
2709 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
2712 static void rbd_notify_released_lock(struct work_struct *work)
2714 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
2715 released_lock_work);
2717 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
2720 static int rbd_request_lock(struct rbd_device *rbd_dev)
2722 struct page **reply_pages;
2724 bool lock_owner_responded = false;
2727 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2729 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
2730 &reply_pages, &reply_len);
2731 if (ret && ret != -ETIMEDOUT) {
2732 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
2736 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
2737 void *p = page_address(reply_pages[0]);
2738 void *const end = p + reply_len;
2741 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
2746 ceph_decode_need(&p, end, 8 + 8, e_inval);
2747 p += 8 + 8; /* skip gid and cookie */
2749 ceph_decode_32_safe(&p, end, len, e_inval);
2753 if (lock_owner_responded) {
2755 "duplicate lock owners detected");
2760 lock_owner_responded = true;
2761 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
2765 "failed to decode ResponseMessage: %d",
2770 ret = ceph_decode_32(&p);
2774 if (!lock_owner_responded) {
2775 rbd_warn(rbd_dev, "no lock owners detected");
2780 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
2788 static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
2790 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
2792 cancel_delayed_work(&rbd_dev->lock_dwork);
2794 wake_up_all(&rbd_dev->lock_waitq);
2796 wake_up(&rbd_dev->lock_waitq);
2799 static int get_lock_owner_info(struct rbd_device *rbd_dev,
2800 struct ceph_locker **lockers, u32 *num_lockers)
2802 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2807 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2809 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
2810 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2811 &lock_type, &lock_tag, lockers, num_lockers);
2815 if (*num_lockers == 0) {
2816 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
2820 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
2821 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
2827 if (lock_type == CEPH_CLS_LOCK_SHARED) {
2828 rbd_warn(rbd_dev, "shared lock type detected");
2833 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
2834 strlen(RBD_LOCK_COOKIE_PREFIX))) {
2835 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
2836 (*lockers)[0].id.cookie);
2846 static int find_watcher(struct rbd_device *rbd_dev,
2847 const struct ceph_locker *locker)
2849 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2850 struct ceph_watch_item *watchers;
2856 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
2857 &rbd_dev->header_oloc, &watchers,
2862 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
2863 for (i = 0; i < num_watchers; i++) {
2864 if (!memcmp(&watchers[i].addr, &locker->info.addr,
2865 sizeof(locker->info.addr)) &&
2866 watchers[i].cookie == cookie) {
2867 struct rbd_client_id cid = {
2868 .gid = le64_to_cpu(watchers[i].name.num),
2872 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
2873 rbd_dev, cid.gid, cid.handle);
2874 rbd_set_owner_cid(rbd_dev, &cid);
2880 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
2888 * lock_rwsem must be held for write
2890 static int rbd_try_lock(struct rbd_device *rbd_dev)
2892 struct ceph_client *client = rbd_dev->rbd_client->client;
2893 struct ceph_locker *lockers;
2898 ret = rbd_lock(rbd_dev);
2902 /* determine if the current lock holder is still alive */
2903 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
2907 if (num_lockers == 0)
2910 ret = find_watcher(rbd_dev, lockers);
2913 ret = 0; /* have to request lock */
2917 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
2918 ENTITY_NAME(lockers[0].id.name));
2920 ret = ceph_monc_blacklist_add(&client->monc,
2921 &lockers[0].info.addr);
2923 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
2924 ENTITY_NAME(lockers[0].id.name), ret);
2928 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
2929 &rbd_dev->header_oloc, RBD_LOCK_NAME,
2930 lockers[0].id.cookie,
2931 &lockers[0].id.name);
2932 if (ret && ret != -ENOENT)
2936 ceph_free_lockers(lockers, num_lockers);
2940 ceph_free_lockers(lockers, num_lockers);
2945 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
2947 static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
2950 enum rbd_lock_state lock_state;
2952 down_read(&rbd_dev->lock_rwsem);
2953 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
2954 rbd_dev->lock_state);
2955 if (__rbd_is_lock_owner(rbd_dev)) {
2956 lock_state = rbd_dev->lock_state;
2957 up_read(&rbd_dev->lock_rwsem);
2961 up_read(&rbd_dev->lock_rwsem);
2962 down_write(&rbd_dev->lock_rwsem);
2963 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
2964 rbd_dev->lock_state);
2965 if (!__rbd_is_lock_owner(rbd_dev)) {
2966 *pret = rbd_try_lock(rbd_dev);
2968 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
2971 lock_state = rbd_dev->lock_state;
2972 up_write(&rbd_dev->lock_rwsem);
2976 static void rbd_acquire_lock(struct work_struct *work)
2978 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
2979 struct rbd_device, lock_dwork);
2980 enum rbd_lock_state lock_state;
2983 dout("%s rbd_dev %p\n", __func__, rbd_dev);
2985 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
2986 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
2987 if (lock_state == RBD_LOCK_STATE_LOCKED)
2988 wake_requests(rbd_dev, true);
2989 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
2990 rbd_dev, lock_state, ret);
2994 ret = rbd_request_lock(rbd_dev);
2995 if (ret == -ETIMEDOUT) {
2996 goto again; /* treat this as a dead client */
2997 } else if (ret == -EROFS) {
2998 rbd_warn(rbd_dev, "peer will not release lock");
3000 * If this is rbd_add_acquire_lock(), we want to fail
3001 * immediately -- reuse BLACKLISTED flag. Otherwise we
3004 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3005 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3006 /* wake "rbd map --exclusive" process */
3007 wake_requests(rbd_dev, false);
3009 } else if (ret < 0) {
3010 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3011 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3015 * lock owner acked, but resend if we don't see them
3018 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3020 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3021 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3026 * lock_rwsem must be held for write
3028 static bool rbd_release_lock(struct rbd_device *rbd_dev)
3030 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3031 rbd_dev->lock_state);
3032 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3035 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3036 downgrade_write(&rbd_dev->lock_rwsem);
3038 * Ensure that all in-flight IO is flushed.
3040 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3041 * may be shared with other devices.
3043 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3044 up_read(&rbd_dev->lock_rwsem);
3046 down_write(&rbd_dev->lock_rwsem);
3047 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3048 rbd_dev->lock_state);
3049 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3052 rbd_unlock(rbd_dev);
3054 * Give others a chance to grab the lock - we would re-acquire
3055 * almost immediately if we got new IO during ceph_osdc_sync()
3056 * otherwise. We need to ack our own notifications, so this
3057 * lock_dwork will be requeued from rbd_wait_state_locked()
3058 * after wake_requests() in rbd_handle_released_lock().
3060 cancel_delayed_work(&rbd_dev->lock_dwork);
3064 static void rbd_release_lock_work(struct work_struct *work)
3066 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3069 down_write(&rbd_dev->lock_rwsem);
3070 rbd_release_lock(rbd_dev);
3071 up_write(&rbd_dev->lock_rwsem);
3074 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3077 struct rbd_client_id cid = { 0 };
3079 if (struct_v >= 2) {
3080 cid.gid = ceph_decode_64(p);
3081 cid.handle = ceph_decode_64(p);
3084 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3086 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3087 down_write(&rbd_dev->lock_rwsem);
3088 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3090 * we already know that the remote client is
3093 up_write(&rbd_dev->lock_rwsem);
3097 rbd_set_owner_cid(rbd_dev, &cid);
3098 downgrade_write(&rbd_dev->lock_rwsem);
3100 down_read(&rbd_dev->lock_rwsem);
3103 if (!__rbd_is_lock_owner(rbd_dev))
3104 wake_requests(rbd_dev, false);
3105 up_read(&rbd_dev->lock_rwsem);
3108 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3111 struct rbd_client_id cid = { 0 };
3113 if (struct_v >= 2) {
3114 cid.gid = ceph_decode_64(p);
3115 cid.handle = ceph_decode_64(p);
3118 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3120 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3121 down_write(&rbd_dev->lock_rwsem);
3122 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3123 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3124 __func__, rbd_dev, cid.gid, cid.handle,
3125 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3126 up_write(&rbd_dev->lock_rwsem);
3130 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3131 downgrade_write(&rbd_dev->lock_rwsem);
3133 down_read(&rbd_dev->lock_rwsem);
3136 if (!__rbd_is_lock_owner(rbd_dev))
3137 wake_requests(rbd_dev, false);
3138 up_read(&rbd_dev->lock_rwsem);
3142 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3143 * ResponseMessage is needed.
3145 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3148 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3149 struct rbd_client_id cid = { 0 };
3152 if (struct_v >= 2) {
3153 cid.gid = ceph_decode_64(p);
3154 cid.handle = ceph_decode_64(p);
3157 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3159 if (rbd_cid_equal(&cid, &my_cid))
3162 down_read(&rbd_dev->lock_rwsem);
3163 if (__rbd_is_lock_owner(rbd_dev)) {
3164 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3165 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3169 * encode ResponseMessage(0) so the peer can detect
3174 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
3175 if (!rbd_dev->opts->exclusive) {
3176 dout("%s rbd_dev %p queueing unlock_work\n",
3178 queue_work(rbd_dev->task_wq,
3179 &rbd_dev->unlock_work);
3181 /* refuse to release the lock */
3188 up_read(&rbd_dev->lock_rwsem);
3192 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3193 u64 notify_id, u64 cookie, s32 *result)
3195 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3196 char buf[4 + CEPH_ENCODING_START_BLK_LEN];
3197 int buf_size = sizeof(buf);
3203 /* encode ResponseMessage */
3204 ceph_start_encoding(&p, 1, 1,
3205 buf_size - CEPH_ENCODING_START_BLK_LEN);
3206 ceph_encode_32(&p, *result);
3211 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3212 &rbd_dev->header_oloc, notify_id, cookie,
3215 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3218 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3221 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3222 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3225 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3226 u64 notify_id, u64 cookie, s32 result)
3228 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3229 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3232 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3233 u64 notifier_id, void *data, size_t data_len)
3235 struct rbd_device *rbd_dev = arg;
3237 void *const end = p + data_len;
3243 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3244 __func__, rbd_dev, cookie, notify_id, data_len);
3246 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3249 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3254 notify_op = ceph_decode_32(&p);
3256 /* legacy notification for header updates */
3257 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3261 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3262 switch (notify_op) {
3263 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3264 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3265 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3267 case RBD_NOTIFY_OP_RELEASED_LOCK:
3268 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3269 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3271 case RBD_NOTIFY_OP_REQUEST_LOCK:
3272 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3274 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3277 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3279 case RBD_NOTIFY_OP_HEADER_UPDATE:
3280 ret = rbd_dev_refresh(rbd_dev);
3282 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3284 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3287 if (rbd_is_lock_owner(rbd_dev))
3288 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3289 cookie, -EOPNOTSUPP);
3291 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3296 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3298 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
3300 struct rbd_device *rbd_dev = arg;
3302 rbd_warn(rbd_dev, "encountered watch error: %d", err);
3304 down_write(&rbd_dev->lock_rwsem);
3305 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3306 up_write(&rbd_dev->lock_rwsem);
3308 mutex_lock(&rbd_dev->watch_mutex);
3309 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3310 __rbd_unregister_watch(rbd_dev);
3311 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
3313 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
3315 mutex_unlock(&rbd_dev->watch_mutex);
3319 * watch_mutex must be locked
3321 static int __rbd_register_watch(struct rbd_device *rbd_dev)
3323 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3324 struct ceph_osd_linger_request *handle;
3326 rbd_assert(!rbd_dev->watch_handle);
3327 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3329 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3330 &rbd_dev->header_oloc, rbd_watch_cb,
3331 rbd_watch_errcb, rbd_dev);
3333 return PTR_ERR(handle);
3335 rbd_dev->watch_handle = handle;
3340 * watch_mutex must be locked
3342 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
3344 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3347 rbd_assert(rbd_dev->watch_handle);
3348 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3350 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3352 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
3354 rbd_dev->watch_handle = NULL;
3357 static int rbd_register_watch(struct rbd_device *rbd_dev)
3361 mutex_lock(&rbd_dev->watch_mutex);
3362 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3363 ret = __rbd_register_watch(rbd_dev);
3367 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3368 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3371 mutex_unlock(&rbd_dev->watch_mutex);
3375 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
3377 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3379 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
3380 cancel_work_sync(&rbd_dev->acquired_lock_work);
3381 cancel_work_sync(&rbd_dev->released_lock_work);
3382 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3383 cancel_work_sync(&rbd_dev->unlock_work);
3386 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3388 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
3389 cancel_tasks_sync(rbd_dev);
3391 mutex_lock(&rbd_dev->watch_mutex);
3392 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3393 __rbd_unregister_watch(rbd_dev);
3394 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3395 mutex_unlock(&rbd_dev->watch_mutex);
3397 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
3401 * lock_rwsem must be held for write
3403 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3405 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3409 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3411 format_lock_cookie(rbd_dev, cookie);
3412 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3413 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3414 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3415 RBD_LOCK_TAG, cookie);
3417 if (ret != -EOPNOTSUPP)
3418 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3422 * Lock cookie cannot be updated on older OSDs, so do
3423 * a manual release and queue an acquire.
3425 if (rbd_release_lock(rbd_dev))
3426 queue_delayed_work(rbd_dev->task_wq,
3427 &rbd_dev->lock_dwork, 0);
3429 __rbd_lock(rbd_dev, cookie);
3433 static void rbd_reregister_watch(struct work_struct *work)
3435 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3436 struct rbd_device, watch_dwork);
3439 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3441 mutex_lock(&rbd_dev->watch_mutex);
3442 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3443 mutex_unlock(&rbd_dev->watch_mutex);
3447 ret = __rbd_register_watch(rbd_dev);
3449 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
3450 if (ret == -EBLACKLISTED || ret == -ENOENT) {
3451 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3452 wake_requests(rbd_dev, true);
3454 queue_delayed_work(rbd_dev->task_wq,
3455 &rbd_dev->watch_dwork,
3458 mutex_unlock(&rbd_dev->watch_mutex);
3462 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3463 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3464 mutex_unlock(&rbd_dev->watch_mutex);
3466 down_write(&rbd_dev->lock_rwsem);
3467 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3468 rbd_reacquire_lock(rbd_dev);
3469 up_write(&rbd_dev->lock_rwsem);
3471 ret = rbd_dev_refresh(rbd_dev);
3473 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
3477 * Synchronous osd object method call. Returns the number of bytes
3478 * returned in the outbound buffer, or a negative error code.
3480 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3481 struct ceph_object_id *oid,
3482 struct ceph_object_locator *oloc,
3483 const char *method_name,
3484 const void *outbound,
3485 size_t outbound_size,
3487 size_t inbound_size)
3489 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3490 struct page *req_page = NULL;
3491 struct page *reply_page;
3495 * Method calls are ultimately read operations. The result
3496 * should placed into the inbound buffer provided. They
3497 * also supply outbound data--parameters for the object
3498 * method. Currently if this is present it will be a
3502 if (outbound_size > PAGE_SIZE)
3505 req_page = alloc_page(GFP_KERNEL);
3509 memcpy(page_address(req_page), outbound, outbound_size);
3512 reply_page = alloc_page(GFP_KERNEL);
3515 __free_page(req_page);
3519 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3520 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3521 reply_page, &inbound_size);
3523 memcpy(inbound, page_address(reply_page), inbound_size);
3528 __free_page(req_page);
3529 __free_page(reply_page);
3534 * lock_rwsem must be held for read
3536 static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3542 * Note the use of mod_delayed_work() in rbd_acquire_lock()
3543 * and cancel_delayed_work() in wake_requests().
3545 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3546 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3547 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
3548 TASK_UNINTERRUPTIBLE);
3549 up_read(&rbd_dev->lock_rwsem);
3551 down_read(&rbd_dev->lock_rwsem);
3552 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3553 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
3555 finish_wait(&rbd_dev->lock_waitq, &wait);
3558 static void rbd_queue_workfn(struct work_struct *work)
3560 struct request *rq = blk_mq_rq_from_pdu(work);
3561 struct rbd_device *rbd_dev = rq->q->queuedata;
3562 struct rbd_img_request *img_request;
3563 struct ceph_snap_context *snapc = NULL;
3564 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3565 u64 length = blk_rq_bytes(rq);
3566 enum obj_operation_type op_type;
3568 bool must_be_locked;
3571 switch (req_op(rq)) {
3572 case REQ_OP_DISCARD:
3573 case REQ_OP_WRITE_ZEROES:
3574 op_type = OBJ_OP_DISCARD;
3577 op_type = OBJ_OP_WRITE;
3580 op_type = OBJ_OP_READ;
3583 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
3588 /* Ignore/skip any zero-length requests */
3591 dout("%s: zero-length request\n", __func__);
3596 rbd_assert(op_type == OBJ_OP_READ ||
3597 rbd_dev->spec->snap_id == CEPH_NOSNAP);
3600 * Quit early if the mapped snapshot no longer exists. It's
3601 * still possible the snapshot will have disappeared by the
3602 * time our request arrives at the osd, but there's no sense in
3603 * sending it if we already know.
3605 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3606 dout("request for non-existent snapshot");
3607 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3612 if (offset && length > U64_MAX - offset + 1) {
3613 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3616 goto err_rq; /* Shouldn't happen */
3619 blk_mq_start_request(rq);
3621 down_read(&rbd_dev->header_rwsem);
3622 mapping_size = rbd_dev->mapping.size;
3623 if (op_type != OBJ_OP_READ) {
3624 snapc = rbd_dev->header.snapc;
3625 ceph_get_snap_context(snapc);
3627 up_read(&rbd_dev->header_rwsem);
3629 if (offset + length > mapping_size) {
3630 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3631 length, mapping_size);
3637 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
3638 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
3639 if (must_be_locked) {
3640 down_read(&rbd_dev->lock_rwsem);
3641 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
3642 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3643 if (rbd_dev->opts->exclusive) {
3644 rbd_warn(rbd_dev, "exclusive lock required");
3648 rbd_wait_state_locked(rbd_dev);
3650 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
3651 result = -EBLACKLISTED;
3656 img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
3661 img_request->rq = rq;
3662 snapc = NULL; /* img_request consumes a ref */
3664 if (op_type == OBJ_OP_DISCARD)
3665 result = rbd_img_fill_nodata(img_request, offset, length);
3667 result = rbd_img_fill_from_bio(img_request, offset, length,
3670 goto err_img_request;
3672 rbd_img_request_submit(img_request);
3674 up_read(&rbd_dev->lock_rwsem);
3678 rbd_img_request_put(img_request);
3681 up_read(&rbd_dev->lock_rwsem);
3684 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3685 obj_op_name(op_type), length, offset, result);
3686 ceph_put_snap_context(snapc);
3688 blk_mq_end_request(rq, errno_to_blk_status(result));
3691 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
3692 const struct blk_mq_queue_data *bd)
3694 struct request *rq = bd->rq;
3695 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3697 queue_work(rbd_wq, work);
3701 static void rbd_free_disk(struct rbd_device *rbd_dev)
3703 blk_cleanup_queue(rbd_dev->disk->queue);
3704 blk_mq_free_tag_set(&rbd_dev->tag_set);
3705 put_disk(rbd_dev->disk);
3706 rbd_dev->disk = NULL;
3709 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3710 struct ceph_object_id *oid,
3711 struct ceph_object_locator *oloc,
3712 void *buf, int buf_len)
3715 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3716 struct ceph_osd_request *req;
3717 struct page **pages;
3718 int num_pages = calc_pages_for(0, buf_len);
3721 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
3725 ceph_oid_copy(&req->r_base_oid, oid);
3726 ceph_oloc_copy(&req->r_base_oloc, oloc);
3727 req->r_flags = CEPH_OSD_FLAG_READ;
3729 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
3733 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
3734 if (IS_ERR(pages)) {
3735 ret = PTR_ERR(pages);
3739 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
3740 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
3743 ceph_osdc_start_request(osdc, req, false);
3744 ret = ceph_osdc_wait_request(osdc, req);
3746 ceph_copy_from_page_vector(pages, buf, 0, ret);
3749 ceph_osdc_put_request(req);
3754 * Read the complete header for the given rbd device. On successful
3755 * return, the rbd_dev->header field will contain up-to-date
3756 * information about the image.
3758 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3760 struct rbd_image_header_ondisk *ondisk = NULL;
3767 * The complete header will include an array of its 64-bit
3768 * snapshot ids, followed by the names of those snapshots as
3769 * a contiguous block of NUL-terminated strings. Note that
3770 * the number of snapshots could change by the time we read
3771 * it in, in which case we re-read it.
3778 size = sizeof (*ondisk);
3779 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3781 ondisk = kmalloc(size, GFP_KERNEL);
3785 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
3786 &rbd_dev->header_oloc, ondisk, size);
3789 if ((size_t)ret < size) {
3791 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3795 if (!rbd_dev_ondisk_valid(ondisk)) {
3797 rbd_warn(rbd_dev, "invalid header");
3801 names_size = le64_to_cpu(ondisk->snap_names_len);
3802 want_count = snap_count;
3803 snap_count = le32_to_cpu(ondisk->snap_count);
3804 } while (snap_count != want_count);
3806 ret = rbd_header_from_disk(rbd_dev, ondisk);
3814 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3815 * has disappeared from the (just updated) snapshot context.
3817 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3821 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3824 snap_id = rbd_dev->spec->snap_id;
3825 if (snap_id == CEPH_NOSNAP)
3828 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3829 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3832 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3837 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
3838 * try to update its size. If REMOVING is set, updating size
3839 * is just useless work since the device can't be opened.
3841 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
3842 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
3843 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3844 dout("setting size to %llu sectors", (unsigned long long)size);
3845 set_capacity(rbd_dev->disk, size);
3846 revalidate_disk(rbd_dev->disk);
3850 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3855 down_write(&rbd_dev->header_rwsem);
3856 mapping_size = rbd_dev->mapping.size;
3858 ret = rbd_dev_header_info(rbd_dev);
3863 * If there is a parent, see if it has disappeared due to the
3864 * mapped image getting flattened.
3866 if (rbd_dev->parent) {
3867 ret = rbd_dev_v2_parent_info(rbd_dev);
3872 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3873 rbd_dev->mapping.size = rbd_dev->header.image_size;
3875 /* validate mapped snapshot's EXISTS flag */
3876 rbd_exists_validate(rbd_dev);
3880 up_write(&rbd_dev->header_rwsem);
3881 if (!ret && mapping_size != rbd_dev->mapping.size)
3882 rbd_dev_update_size(rbd_dev);
3887 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
3888 unsigned int hctx_idx, unsigned int numa_node)
3890 struct work_struct *work = blk_mq_rq_to_pdu(rq);
3892 INIT_WORK(work, rbd_queue_workfn);
3896 static const struct blk_mq_ops rbd_mq_ops = {
3897 .queue_rq = rbd_queue_rq,
3898 .init_request = rbd_init_request,
3901 static int rbd_init_disk(struct rbd_device *rbd_dev)
3903 struct gendisk *disk;
3904 struct request_queue *q;
3908 /* create gendisk info */
3909 disk = alloc_disk(single_major ?
3910 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3911 RBD_MINORS_PER_MAJOR);
3915 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3917 disk->major = rbd_dev->major;
3918 disk->first_minor = rbd_dev->minor;
3920 disk->flags |= GENHD_FL_EXT_DEVT;
3921 disk->fops = &rbd_bd_ops;
3922 disk->private_data = rbd_dev;
3924 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
3925 rbd_dev->tag_set.ops = &rbd_mq_ops;
3926 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
3927 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
3928 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
3929 rbd_dev->tag_set.nr_hw_queues = 1;
3930 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
3932 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
3936 q = blk_mq_init_queue(&rbd_dev->tag_set);
3942 blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
3943 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
3945 /* set io sizes to object size */
3946 segment_size = rbd_obj_bytes(&rbd_dev->header);
3947 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3948 q->limits.max_sectors = queue_max_hw_sectors(q);
3949 blk_queue_max_segments(q, USHRT_MAX);
3950 blk_queue_max_segment_size(q, UINT_MAX);
3951 blk_queue_io_min(q, segment_size);
3952 blk_queue_io_opt(q, segment_size);
3954 /* enable the discard support */
3955 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
3956 q->limits.discard_granularity = segment_size;
3957 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
3958 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
3960 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
3961 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
3964 * disk_release() expects a queue ref from add_disk() and will
3965 * put it. Hold an extra ref until add_disk() is called.
3967 WARN_ON(!blk_get_queue(q));
3969 q->queuedata = rbd_dev;
3971 rbd_dev->disk = disk;
3975 blk_mq_free_tag_set(&rbd_dev->tag_set);
3985 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3987 return container_of(dev, struct rbd_device, dev);
3990 static ssize_t rbd_size_show(struct device *dev,
3991 struct device_attribute *attr, char *buf)
3993 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3995 return sprintf(buf, "%llu\n",
3996 (unsigned long long)rbd_dev->mapping.size);
4000 * Note this shows the features for whatever's mapped, which is not
4001 * necessarily the base image.
4003 static ssize_t rbd_features_show(struct device *dev,
4004 struct device_attribute *attr, char *buf)
4006 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4008 return sprintf(buf, "0x%016llx\n",
4009 (unsigned long long)rbd_dev->mapping.features);
4012 static ssize_t rbd_major_show(struct device *dev,
4013 struct device_attribute *attr, char *buf)
4015 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4018 return sprintf(buf, "%d\n", rbd_dev->major);
4020 return sprintf(buf, "(none)\n");
4023 static ssize_t rbd_minor_show(struct device *dev,
4024 struct device_attribute *attr, char *buf)
4026 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4028 return sprintf(buf, "%d\n", rbd_dev->minor);
4031 static ssize_t rbd_client_addr_show(struct device *dev,
4032 struct device_attribute *attr, char *buf)
4034 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4035 struct ceph_entity_addr *client_addr =
4036 ceph_client_addr(rbd_dev->rbd_client->client);
4038 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4039 le32_to_cpu(client_addr->nonce));
4042 static ssize_t rbd_client_id_show(struct device *dev,
4043 struct device_attribute *attr, char *buf)
4045 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4047 return sprintf(buf, "client%lld\n",
4048 ceph_client_gid(rbd_dev->rbd_client->client));
4051 static ssize_t rbd_cluster_fsid_show(struct device *dev,
4052 struct device_attribute *attr, char *buf)
4054 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4056 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4059 static ssize_t rbd_config_info_show(struct device *dev,
4060 struct device_attribute *attr, char *buf)
4062 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4064 return sprintf(buf, "%s\n", rbd_dev->config_info);
4067 static ssize_t rbd_pool_show(struct device *dev,
4068 struct device_attribute *attr, char *buf)
4070 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4072 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
4075 static ssize_t rbd_pool_id_show(struct device *dev,
4076 struct device_attribute *attr, char *buf)
4078 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4080 return sprintf(buf, "%llu\n",
4081 (unsigned long long) rbd_dev->spec->pool_id);
4084 static ssize_t rbd_name_show(struct device *dev,
4085 struct device_attribute *attr, char *buf)
4087 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4089 if (rbd_dev->spec->image_name)
4090 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4092 return sprintf(buf, "(unknown)\n");
4095 static ssize_t rbd_image_id_show(struct device *dev,
4096 struct device_attribute *attr, char *buf)
4098 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4100 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
4104 * Shows the name of the currently-mapped snapshot (or
4105 * RBD_SNAP_HEAD_NAME for the base image).
4107 static ssize_t rbd_snap_show(struct device *dev,
4108 struct device_attribute *attr,
4111 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4113 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
4116 static ssize_t rbd_snap_id_show(struct device *dev,
4117 struct device_attribute *attr, char *buf)
4119 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4121 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4125 * For a v2 image, shows the chain of parent images, separated by empty
4126 * lines. For v1 images or if there is no parent, shows "(no parent
4129 static ssize_t rbd_parent_show(struct device *dev,
4130 struct device_attribute *attr,
4133 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4136 if (!rbd_dev->parent)
4137 return sprintf(buf, "(no parent image)\n");
4139 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4140 struct rbd_spec *spec = rbd_dev->parent_spec;
4142 count += sprintf(&buf[count], "%s"
4143 "pool_id %llu\npool_name %s\n"
4144 "image_id %s\nimage_name %s\n"
4145 "snap_id %llu\nsnap_name %s\n"
4147 !count ? "" : "\n", /* first? */
4148 spec->pool_id, spec->pool_name,
4149 spec->image_id, spec->image_name ?: "(unknown)",
4150 spec->snap_id, spec->snap_name,
4151 rbd_dev->parent_overlap);
4157 static ssize_t rbd_image_refresh(struct device *dev,
4158 struct device_attribute *attr,
4162 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4165 ret = rbd_dev_refresh(rbd_dev);
4172 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
4173 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
4174 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
4175 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
4176 static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
4177 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
4178 static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
4179 static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
4180 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
4181 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
4182 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
4183 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
4184 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4185 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
4186 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
4187 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
4189 static struct attribute *rbd_attrs[] = {
4190 &dev_attr_size.attr,
4191 &dev_attr_features.attr,
4192 &dev_attr_major.attr,
4193 &dev_attr_minor.attr,
4194 &dev_attr_client_addr.attr,
4195 &dev_attr_client_id.attr,
4196 &dev_attr_cluster_fsid.attr,
4197 &dev_attr_config_info.attr,
4198 &dev_attr_pool.attr,
4199 &dev_attr_pool_id.attr,
4200 &dev_attr_name.attr,
4201 &dev_attr_image_id.attr,
4202 &dev_attr_current_snap.attr,
4203 &dev_attr_snap_id.attr,
4204 &dev_attr_parent.attr,
4205 &dev_attr_refresh.attr,
4209 static struct attribute_group rbd_attr_group = {
4213 static const struct attribute_group *rbd_attr_groups[] = {
4218 static void rbd_dev_release(struct device *dev);
4220 static const struct device_type rbd_device_type = {
4222 .groups = rbd_attr_groups,
4223 .release = rbd_dev_release,
4226 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4228 kref_get(&spec->kref);
4233 static void rbd_spec_free(struct kref *kref);
4234 static void rbd_spec_put(struct rbd_spec *spec)
4237 kref_put(&spec->kref, rbd_spec_free);
4240 static struct rbd_spec *rbd_spec_alloc(void)
4242 struct rbd_spec *spec;
4244 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4248 spec->pool_id = CEPH_NOPOOL;
4249 spec->snap_id = CEPH_NOSNAP;
4250 kref_init(&spec->kref);
4255 static void rbd_spec_free(struct kref *kref)
4257 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4259 kfree(spec->pool_name);
4260 kfree(spec->image_id);
4261 kfree(spec->image_name);
4262 kfree(spec->snap_name);
4266 static void rbd_dev_free(struct rbd_device *rbd_dev)
4268 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
4269 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
4271 ceph_oid_destroy(&rbd_dev->header_oid);
4272 ceph_oloc_destroy(&rbd_dev->header_oloc);
4273 kfree(rbd_dev->config_info);
4275 rbd_put_client(rbd_dev->rbd_client);
4276 rbd_spec_put(rbd_dev->spec);
4277 kfree(rbd_dev->opts);
4281 static void rbd_dev_release(struct device *dev)
4283 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4284 bool need_put = !!rbd_dev->opts;
4287 destroy_workqueue(rbd_dev->task_wq);
4288 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4291 rbd_dev_free(rbd_dev);
4294 * This is racy, but way better than putting module outside of
4295 * the release callback. The race window is pretty small, so
4296 * doing something similar to dm (dm-builtin.c) is overkill.
4299 module_put(THIS_MODULE);
4302 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4303 struct rbd_spec *spec)
4305 struct rbd_device *rbd_dev;
4307 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
4311 spin_lock_init(&rbd_dev->lock);
4312 INIT_LIST_HEAD(&rbd_dev->node);
4313 init_rwsem(&rbd_dev->header_rwsem);
4315 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
4316 ceph_oid_init(&rbd_dev->header_oid);
4317 rbd_dev->header_oloc.pool = spec->pool_id;
4319 mutex_init(&rbd_dev->watch_mutex);
4320 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4321 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4323 init_rwsem(&rbd_dev->lock_rwsem);
4324 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4325 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4326 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4327 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4328 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4329 init_waitqueue_head(&rbd_dev->lock_waitq);
4331 rbd_dev->dev.bus = &rbd_bus_type;
4332 rbd_dev->dev.type = &rbd_device_type;
4333 rbd_dev->dev.parent = &rbd_root_dev;
4334 device_initialize(&rbd_dev->dev);
4336 rbd_dev->rbd_client = rbdc;
4337 rbd_dev->spec = spec;
4343 * Create a mapping rbd_dev.
4345 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4346 struct rbd_spec *spec,
4347 struct rbd_options *opts)
4349 struct rbd_device *rbd_dev;
4351 rbd_dev = __rbd_dev_create(rbdc, spec);
4355 rbd_dev->opts = opts;
4357 /* get an id and fill in device name */
4358 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4359 minor_to_rbd_dev_id(1 << MINORBITS),
4361 if (rbd_dev->dev_id < 0)
4364 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4365 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4367 if (!rbd_dev->task_wq)
4370 /* we have a ref from do_rbd_add() */
4371 __module_get(THIS_MODULE);
4373 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
4377 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4379 rbd_dev_free(rbd_dev);
4383 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4386 put_device(&rbd_dev->dev);
4390 * Get the size and object order for an image snapshot, or if
4391 * snap_id is CEPH_NOSNAP, gets this information for the base
4394 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4395 u8 *order, u64 *snap_size)
4397 __le64 snapid = cpu_to_le64(snap_id);
4402 } __attribute__ ((packed)) size_buf = { 0 };
4404 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4405 &rbd_dev->header_oloc, "get_size",
4406 &snapid, sizeof(snapid),
4407 &size_buf, sizeof(size_buf));
4408 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4411 if (ret < sizeof (size_buf))
4415 *order = size_buf.order;
4416 dout(" order %u", (unsigned int)*order);
4418 *snap_size = le64_to_cpu(size_buf.size);
4420 dout(" snap_id 0x%016llx snap_size = %llu\n",
4421 (unsigned long long)snap_id,
4422 (unsigned long long)*snap_size);
4427 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4429 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4430 &rbd_dev->header.obj_order,
4431 &rbd_dev->header.image_size);
4434 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4440 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4444 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4445 &rbd_dev->header_oloc, "get_object_prefix",
4446 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4447 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4452 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4453 p + ret, NULL, GFP_NOIO);
4456 if (IS_ERR(rbd_dev->header.object_prefix)) {
4457 ret = PTR_ERR(rbd_dev->header.object_prefix);
4458 rbd_dev->header.object_prefix = NULL;
4460 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4468 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4471 __le64 snapid = cpu_to_le64(snap_id);
4475 } __attribute__ ((packed)) features_buf = { 0 };
4479 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4480 &rbd_dev->header_oloc, "get_features",
4481 &snapid, sizeof(snapid),
4482 &features_buf, sizeof(features_buf));
4483 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4486 if (ret < sizeof (features_buf))
4489 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4491 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4496 *snap_features = le64_to_cpu(features_buf.features);
4498 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4499 (unsigned long long)snap_id,
4500 (unsigned long long)*snap_features,
4501 (unsigned long long)le64_to_cpu(features_buf.incompat));
4506 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4508 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4509 &rbd_dev->header.features);
4512 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4514 struct rbd_spec *parent_spec;
4516 void *reply_buf = NULL;
4526 parent_spec = rbd_spec_alloc();
4530 size = sizeof (__le64) + /* pool_id */
4531 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
4532 sizeof (__le64) + /* snap_id */
4533 sizeof (__le64); /* overlap */
4534 reply_buf = kmalloc(size, GFP_KERNEL);
4540 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4541 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4542 &rbd_dev->header_oloc, "get_parent",
4543 &snapid, sizeof(snapid), reply_buf, size);
4544 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4549 end = reply_buf + ret;
4551 ceph_decode_64_safe(&p, end, pool_id, out_err);
4552 if (pool_id == CEPH_NOPOOL) {
4554 * Either the parent never existed, or we have
4555 * record of it but the image got flattened so it no
4556 * longer has a parent. When the parent of a
4557 * layered image disappears we immediately set the
4558 * overlap to 0. The effect of this is that all new
4559 * requests will be treated as if the image had no
4562 if (rbd_dev->parent_overlap) {
4563 rbd_dev->parent_overlap = 0;
4564 rbd_dev_parent_put(rbd_dev);
4565 pr_info("%s: clone image has been flattened\n",
4566 rbd_dev->disk->disk_name);
4569 goto out; /* No parent? No problem. */
4572 /* The ceph file layout needs to fit pool id in 32 bits */
4575 if (pool_id > (u64)U32_MAX) {
4576 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4577 (unsigned long long)pool_id, U32_MAX);
4581 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4582 if (IS_ERR(image_id)) {
4583 ret = PTR_ERR(image_id);
4586 ceph_decode_64_safe(&p, end, snap_id, out_err);
4587 ceph_decode_64_safe(&p, end, overlap, out_err);
4590 * The parent won't change (except when the clone is
4591 * flattened, already handled that). So we only need to
4592 * record the parent spec we have not already done so.
4594 if (!rbd_dev->parent_spec) {
4595 parent_spec->pool_id = pool_id;
4596 parent_spec->image_id = image_id;
4597 parent_spec->snap_id = snap_id;
4598 rbd_dev->parent_spec = parent_spec;
4599 parent_spec = NULL; /* rbd_dev now owns this */
4605 * We always update the parent overlap. If it's zero we issue
4606 * a warning, as we will proceed as if there was no parent.
4610 /* refresh, careful to warn just once */
4611 if (rbd_dev->parent_overlap)
4613 "clone now standalone (overlap became 0)");
4616 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
4619 rbd_dev->parent_overlap = overlap;
4625 rbd_spec_put(parent_spec);
4630 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4634 __le64 stripe_count;
4635 } __attribute__ ((packed)) striping_info_buf = { 0 };
4636 size_t size = sizeof (striping_info_buf);
4640 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4641 &rbd_dev->header_oloc, "get_stripe_unit_count",
4642 NULL, 0, &striping_info_buf, size);
4643 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4649 p = &striping_info_buf;
4650 rbd_dev->header.stripe_unit = ceph_decode_64(&p);
4651 rbd_dev->header.stripe_count = ceph_decode_64(&p);
4655 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
4657 __le64 data_pool_id;
4660 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4661 &rbd_dev->header_oloc, "get_data_pool",
4662 NULL, 0, &data_pool_id, sizeof(data_pool_id));
4665 if (ret < sizeof(data_pool_id))
4668 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
4669 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
4673 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4675 CEPH_DEFINE_OID_ONSTACK(oid);
4676 size_t image_id_size;
4681 void *reply_buf = NULL;
4683 char *image_name = NULL;
4686 rbd_assert(!rbd_dev->spec->image_name);
4688 len = strlen(rbd_dev->spec->image_id);
4689 image_id_size = sizeof (__le32) + len;
4690 image_id = kmalloc(image_id_size, GFP_KERNEL);
4695 end = image_id + image_id_size;
4696 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4698 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4699 reply_buf = kmalloc(size, GFP_KERNEL);
4703 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
4704 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
4705 "dir_get_name", image_id, image_id_size,
4710 end = reply_buf + ret;
4712 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4713 if (IS_ERR(image_name))
4716 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4724 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4726 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4727 const char *snap_name;
4730 /* Skip over names until we find the one we are looking for */
4732 snap_name = rbd_dev->header.snap_names;
4733 while (which < snapc->num_snaps) {
4734 if (!strcmp(name, snap_name))
4735 return snapc->snaps[which];
4736 snap_name += strlen(snap_name) + 1;
4742 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4744 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4749 for (which = 0; !found && which < snapc->num_snaps; which++) {
4750 const char *snap_name;
4752 snap_id = snapc->snaps[which];
4753 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4754 if (IS_ERR(snap_name)) {
4755 /* ignore no-longer existing snapshots */
4756 if (PTR_ERR(snap_name) == -ENOENT)
4761 found = !strcmp(name, snap_name);
4764 return found ? snap_id : CEPH_NOSNAP;
4768 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4769 * no snapshot by that name is found, or if an error occurs.
4771 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4773 if (rbd_dev->image_format == 1)
4774 return rbd_v1_snap_id_by_name(rbd_dev, name);
4776 return rbd_v2_snap_id_by_name(rbd_dev, name);
4780 * An image being mapped will have everything but the snap id.
4782 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4784 struct rbd_spec *spec = rbd_dev->spec;
4786 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4787 rbd_assert(spec->image_id && spec->image_name);
4788 rbd_assert(spec->snap_name);
4790 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4793 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4794 if (snap_id == CEPH_NOSNAP)
4797 spec->snap_id = snap_id;
4799 spec->snap_id = CEPH_NOSNAP;
4806 * A parent image will have all ids but none of the names.
4808 * All names in an rbd spec are dynamically allocated. It's OK if we
4809 * can't figure out the name for an image id.
4811 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4813 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4814 struct rbd_spec *spec = rbd_dev->spec;
4815 const char *pool_name;
4816 const char *image_name;
4817 const char *snap_name;
4820 rbd_assert(spec->pool_id != CEPH_NOPOOL);
4821 rbd_assert(spec->image_id);
4822 rbd_assert(spec->snap_id != CEPH_NOSNAP);
4824 /* Get the pool name; we have to make our own copy of this */
4826 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4828 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4831 pool_name = kstrdup(pool_name, GFP_KERNEL);
4835 /* Fetch the image name; tolerate failure here */
4837 image_name = rbd_dev_image_name(rbd_dev);
4839 rbd_warn(rbd_dev, "unable to get image name");
4841 /* Fetch the snapshot name */
4843 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4844 if (IS_ERR(snap_name)) {
4845 ret = PTR_ERR(snap_name);
4849 spec->pool_name = pool_name;
4850 spec->image_name = image_name;
4851 spec->snap_name = snap_name;
4861 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4870 struct ceph_snap_context *snapc;
4874 * We'll need room for the seq value (maximum snapshot id),
4875 * snapshot count, and array of that many snapshot ids.
4876 * For now we have a fixed upper limit on the number we're
4877 * prepared to receive.
4879 size = sizeof (__le64) + sizeof (__le32) +
4880 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4881 reply_buf = kzalloc(size, GFP_KERNEL);
4885 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4886 &rbd_dev->header_oloc, "get_snapcontext",
4887 NULL, 0, reply_buf, size);
4888 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4893 end = reply_buf + ret;
4895 ceph_decode_64_safe(&p, end, seq, out);
4896 ceph_decode_32_safe(&p, end, snap_count, out);
4899 * Make sure the reported number of snapshot ids wouldn't go
4900 * beyond the end of our buffer. But before checking that,
4901 * make sure the computed size of the snapshot context we
4902 * allocate is representable in a size_t.
4904 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4909 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4913 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4919 for (i = 0; i < snap_count; i++)
4920 snapc->snaps[i] = ceph_decode_64(&p);
4922 ceph_put_snap_context(rbd_dev->header.snapc);
4923 rbd_dev->header.snapc = snapc;
4925 dout(" snap context seq = %llu, snap_count = %u\n",
4926 (unsigned long long)seq, (unsigned int)snap_count);
4933 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4944 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4945 reply_buf = kmalloc(size, GFP_KERNEL);
4947 return ERR_PTR(-ENOMEM);
4949 snapid = cpu_to_le64(snap_id);
4950 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4951 &rbd_dev->header_oloc, "get_snapshot_name",
4952 &snapid, sizeof(snapid), reply_buf, size);
4953 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4955 snap_name = ERR_PTR(ret);
4960 end = reply_buf + ret;
4961 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4962 if (IS_ERR(snap_name))
4965 dout(" snap_id 0x%016llx snap_name = %s\n",
4966 (unsigned long long)snap_id, snap_name);
4973 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4975 bool first_time = rbd_dev->header.object_prefix == NULL;
4978 ret = rbd_dev_v2_image_size(rbd_dev);
4983 ret = rbd_dev_v2_header_onetime(rbd_dev);
4988 ret = rbd_dev_v2_snap_context(rbd_dev);
4989 if (ret && first_time) {
4990 kfree(rbd_dev->header.object_prefix);
4991 rbd_dev->header.object_prefix = NULL;
4997 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4999 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5001 if (rbd_dev->image_format == 1)
5002 return rbd_dev_v1_header_info(rbd_dev);
5004 return rbd_dev_v2_header_info(rbd_dev);
5008 * Skips over white space at *buf, and updates *buf to point to the
5009 * first found non-space character (if any). Returns the length of
5010 * the token (string of non-white space characters) found. Note
5011 * that *buf must be terminated with '\0'.
5013 static inline size_t next_token(const char **buf)
5016 * These are the characters that produce nonzero for
5017 * isspace() in the "C" and "POSIX" locales.
5019 const char *spaces = " \f\n\r\t\v";
5021 *buf += strspn(*buf, spaces); /* Find start of token */
5023 return strcspn(*buf, spaces); /* Return token length */
5027 * Finds the next token in *buf, dynamically allocates a buffer big
5028 * enough to hold a copy of it, and copies the token into the new
5029 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5030 * that a duplicate buffer is created even for a zero-length token.
5032 * Returns a pointer to the newly-allocated duplicate, or a null
5033 * pointer if memory for the duplicate was not available. If
5034 * the lenp argument is a non-null pointer, the length of the token
5035 * (not including the '\0') is returned in *lenp.
5037 * If successful, the *buf pointer will be updated to point beyond
5038 * the end of the found token.
5040 * Note: uses GFP_KERNEL for allocation.
5042 static inline char *dup_token(const char **buf, size_t *lenp)
5047 len = next_token(buf);
5048 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
5051 *(dup + len) = '\0';
5061 * Parse the options provided for an "rbd add" (i.e., rbd image
5062 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5063 * and the data written is passed here via a NUL-terminated buffer.
5064 * Returns 0 if successful or an error code otherwise.
5066 * The information extracted from these options is recorded in
5067 * the other parameters which return dynamically-allocated
5070 * The address of a pointer that will refer to a ceph options
5071 * structure. Caller must release the returned pointer using
5072 * ceph_destroy_options() when it is no longer needed.
5074 * Address of an rbd options pointer. Fully initialized by
5075 * this function; caller must release with kfree().
5077 * Address of an rbd image specification pointer. Fully
5078 * initialized by this function based on parsed options.
5079 * Caller must release with rbd_spec_put().
5081 * The options passed take this form:
5082 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5085 * A comma-separated list of one or more monitor addresses.
5086 * A monitor address is an ip address, optionally followed
5087 * by a port number (separated by a colon).
5088 * I.e.: ip1[:port1][,ip2[:port2]...]
5090 * A comma-separated list of ceph and/or rbd options.
5092 * The name of the rados pool containing the rbd image.
5094 * The name of the image in that pool to map.
5096 * An optional snapshot id. If provided, the mapping will
5097 * present data from the image at the time that snapshot was
5098 * created. The image head is used if no snapshot id is
5099 * provided. Snapshot mappings are always read-only.
5101 static int rbd_add_parse_args(const char *buf,
5102 struct ceph_options **ceph_opts,
5103 struct rbd_options **opts,
5104 struct rbd_spec **rbd_spec)
5108 const char *mon_addrs;
5110 size_t mon_addrs_size;
5111 struct rbd_spec *spec = NULL;
5112 struct rbd_options *rbd_opts = NULL;
5113 struct ceph_options *copts;
5116 /* The first four tokens are required */
5118 len = next_token(&buf);
5120 rbd_warn(NULL, "no monitor address(es) provided");
5124 mon_addrs_size = len + 1;
5128 options = dup_token(&buf, NULL);
5132 rbd_warn(NULL, "no options provided");
5136 spec = rbd_spec_alloc();
5140 spec->pool_name = dup_token(&buf, NULL);
5141 if (!spec->pool_name)
5143 if (!*spec->pool_name) {
5144 rbd_warn(NULL, "no pool name provided");
5148 spec->image_name = dup_token(&buf, NULL);
5149 if (!spec->image_name)
5151 if (!*spec->image_name) {
5152 rbd_warn(NULL, "no image name provided");
5157 * Snapshot name is optional; default is to use "-"
5158 * (indicating the head/no snapshot).
5160 len = next_token(&buf);
5162 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5163 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
5164 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
5165 ret = -ENAMETOOLONG;
5168 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5171 *(snap_name + len) = '\0';
5172 spec->snap_name = snap_name;
5174 /* Initialize all rbd options to the defaults */
5176 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5180 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
5181 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
5182 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
5183 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
5185 copts = ceph_parse_options(options, mon_addrs,
5186 mon_addrs + mon_addrs_size - 1,
5187 parse_rbd_opts_token, rbd_opts);
5188 if (IS_ERR(copts)) {
5189 ret = PTR_ERR(copts);
5209 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5211 down_write(&rbd_dev->lock_rwsem);
5212 if (__rbd_is_lock_owner(rbd_dev))
5213 rbd_unlock(rbd_dev);
5214 up_write(&rbd_dev->lock_rwsem);
5217 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5219 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5220 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5224 /* FIXME: "rbd map --exclusive" should be in interruptible */
5225 down_read(&rbd_dev->lock_rwsem);
5226 rbd_wait_state_locked(rbd_dev);
5227 up_read(&rbd_dev->lock_rwsem);
5228 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5229 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5237 * An rbd format 2 image has a unique identifier, distinct from the
5238 * name given to it by the user. Internally, that identifier is
5239 * what's used to specify the names of objects related to the image.
5241 * A special "rbd id" object is used to map an rbd image name to its
5242 * id. If that object doesn't exist, then there is no v2 rbd image
5243 * with the supplied name.
5245 * This function will record the given rbd_dev's image_id field if
5246 * it can be determined, and in that case will return 0. If any
5247 * errors occur a negative errno will be returned and the rbd_dev's
5248 * image_id field will be unchanged (and should be NULL).
5250 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5254 CEPH_DEFINE_OID_ONSTACK(oid);
5259 * When probing a parent image, the image id is already
5260 * known (and the image name likely is not). There's no
5261 * need to fetch the image id again in this case. We
5262 * do still need to set the image format though.
5264 if (rbd_dev->spec->image_id) {
5265 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5271 * First, see if the format 2 image id file exists, and if
5272 * so, get the image's persistent id from it.
5274 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5275 rbd_dev->spec->image_name);
5279 dout("rbd id object name is %s\n", oid.name);
5281 /* Response will be an encoded string, which includes a length */
5283 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5284 response = kzalloc(size, GFP_NOIO);
5290 /* If it doesn't exist we'll assume it's a format 1 image */
5292 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5294 response, RBD_IMAGE_ID_LEN_MAX);
5295 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5296 if (ret == -ENOENT) {
5297 image_id = kstrdup("", GFP_KERNEL);
5298 ret = image_id ? 0 : -ENOMEM;
5300 rbd_dev->image_format = 1;
5301 } else if (ret >= 0) {
5304 image_id = ceph_extract_encoded_string(&p, p + ret,
5306 ret = PTR_ERR_OR_ZERO(image_id);
5308 rbd_dev->image_format = 2;
5312 rbd_dev->spec->image_id = image_id;
5313 dout("image_id is %s\n", image_id);
5317 ceph_oid_destroy(&oid);
5322 * Undo whatever state changes are made by v1 or v2 header info
5325 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5327 struct rbd_image_header *header;
5329 rbd_dev_parent_put(rbd_dev);
5331 /* Free dynamic fields from the header, then zero it out */
5333 header = &rbd_dev->header;
5334 ceph_put_snap_context(header->snapc);
5335 kfree(header->snap_sizes);
5336 kfree(header->snap_names);
5337 kfree(header->object_prefix);
5338 memset(header, 0, sizeof (*header));
5341 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5345 ret = rbd_dev_v2_object_prefix(rbd_dev);
5350 * Get the and check features for the image. Currently the
5351 * features are assumed to never change.
5353 ret = rbd_dev_v2_features(rbd_dev);
5357 /* If the image supports fancy striping, get its parameters */
5359 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5360 ret = rbd_dev_v2_striping_info(rbd_dev);
5365 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5366 ret = rbd_dev_v2_data_pool(rbd_dev);
5371 rbd_init_layout(rbd_dev);
5375 rbd_dev->header.features = 0;
5376 kfree(rbd_dev->header.object_prefix);
5377 rbd_dev->header.object_prefix = NULL;
5382 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5383 * rbd_dev_image_probe() recursion depth, which means it's also the
5384 * length of the already discovered part of the parent chain.
5386 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5388 struct rbd_device *parent = NULL;
5391 if (!rbd_dev->parent_spec)
5394 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5395 pr_info("parent chain is too long (%d)\n", depth);
5400 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
5407 * Images related by parent/child relationships always share
5408 * rbd_client and spec/parent_spec, so bump their refcounts.
5410 __rbd_get_client(rbd_dev->rbd_client);
5411 rbd_spec_get(rbd_dev->parent_spec);
5413 ret = rbd_dev_image_probe(parent, depth);
5417 rbd_dev->parent = parent;
5418 atomic_set(&rbd_dev->parent_ref, 1);
5422 rbd_dev_unparent(rbd_dev);
5423 rbd_dev_destroy(parent);
5427 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5429 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5430 rbd_dev_mapping_clear(rbd_dev);
5431 rbd_free_disk(rbd_dev);
5433 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5437 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5440 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5444 /* Record our major and minor device numbers. */
5446 if (!single_major) {
5447 ret = register_blkdev(0, rbd_dev->name);
5449 goto err_out_unlock;
5451 rbd_dev->major = ret;
5454 rbd_dev->major = rbd_major;
5455 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5458 /* Set up the blkdev mapping. */
5460 ret = rbd_init_disk(rbd_dev);
5462 goto err_out_blkdev;
5464 ret = rbd_dev_mapping_set(rbd_dev);
5468 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5469 set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
5471 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
5473 goto err_out_mapping;
5475 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5476 up_write(&rbd_dev->header_rwsem);
5480 rbd_dev_mapping_clear(rbd_dev);
5482 rbd_free_disk(rbd_dev);
5485 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5487 up_write(&rbd_dev->header_rwsem);
5491 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5493 struct rbd_spec *spec = rbd_dev->spec;
5496 /* Record the header object name for this rbd image. */
5498 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5499 if (rbd_dev->image_format == 1)
5500 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5501 spec->image_name, RBD_SUFFIX);
5503 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
5504 RBD_HEADER_PREFIX, spec->image_id);
5509 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5511 rbd_dev_unprobe(rbd_dev);
5513 rbd_unregister_watch(rbd_dev);
5514 rbd_dev->image_format = 0;
5515 kfree(rbd_dev->spec->image_id);
5516 rbd_dev->spec->image_id = NULL;
5520 * Probe for the existence of the header object for the given rbd
5521 * device. If this image is the one being mapped (i.e., not a
5522 * parent), initiate a watch on its header object before using that
5523 * object to get detailed information about the rbd image.
5525 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
5530 * Get the id from the image id object. Unless there's an
5531 * error, rbd_dev->spec->image_id will be filled in with
5532 * a dynamically-allocated string, and rbd_dev->image_format
5533 * will be set to either 1 or 2.
5535 ret = rbd_dev_image_id(rbd_dev);
5539 ret = rbd_dev_header_name(rbd_dev);
5541 goto err_out_format;
5544 ret = rbd_register_watch(rbd_dev);
5547 pr_info("image %s/%s does not exist\n",
5548 rbd_dev->spec->pool_name,
5549 rbd_dev->spec->image_name);
5550 goto err_out_format;
5554 ret = rbd_dev_header_info(rbd_dev);
5559 * If this image is the one being mapped, we have pool name and
5560 * id, image name and id, and snap name - need to fill snap id.
5561 * Otherwise this is a parent image, identified by pool, image
5562 * and snap ids - need to fill in names for those ids.
5565 ret = rbd_spec_fill_snap_id(rbd_dev);
5567 ret = rbd_spec_fill_names(rbd_dev);
5570 pr_info("snap %s/%s@%s does not exist\n",
5571 rbd_dev->spec->pool_name,
5572 rbd_dev->spec->image_name,
5573 rbd_dev->spec->snap_name);
5577 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5578 ret = rbd_dev_v2_parent_info(rbd_dev);
5583 * Need to warn users if this image is the one being
5584 * mapped and has a parent.
5586 if (!depth && rbd_dev->parent_spec)
5588 "WARNING: kernel layering is EXPERIMENTAL!");
5591 ret = rbd_dev_probe_parent(rbd_dev, depth);
5595 dout("discovered format %u image, header name is %s\n",
5596 rbd_dev->image_format, rbd_dev->header_oid.name);
5600 rbd_dev_unprobe(rbd_dev);
5603 rbd_unregister_watch(rbd_dev);
5605 rbd_dev->image_format = 0;
5606 kfree(rbd_dev->spec->image_id);
5607 rbd_dev->spec->image_id = NULL;
5611 static ssize_t do_rbd_add(struct bus_type *bus,
5615 struct rbd_device *rbd_dev = NULL;
5616 struct ceph_options *ceph_opts = NULL;
5617 struct rbd_options *rbd_opts = NULL;
5618 struct rbd_spec *spec = NULL;
5619 struct rbd_client *rbdc;
5622 if (!try_module_get(THIS_MODULE))
5625 /* parse add command */
5626 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5630 rbdc = rbd_get_client(ceph_opts);
5637 rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
5640 pr_info("pool %s does not exist\n", spec->pool_name);
5641 goto err_out_client;
5643 spec->pool_id = (u64)rc;
5645 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
5648 goto err_out_client;
5650 rbdc = NULL; /* rbd_dev now owns this */
5651 spec = NULL; /* rbd_dev now owns this */
5652 rbd_opts = NULL; /* rbd_dev now owns this */
5654 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
5655 if (!rbd_dev->config_info) {
5657 goto err_out_rbd_dev;
5660 down_write(&rbd_dev->header_rwsem);
5661 rc = rbd_dev_image_probe(rbd_dev, 0);
5663 up_write(&rbd_dev->header_rwsem);
5664 goto err_out_rbd_dev;
5667 /* If we are mapping a snapshot it must be marked read-only */
5668 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5669 rbd_dev->opts->read_only = true;
5671 rc = rbd_dev_device_setup(rbd_dev);
5673 goto err_out_image_probe;
5675 if (rbd_dev->opts->exclusive) {
5676 rc = rbd_add_acquire_lock(rbd_dev);
5678 goto err_out_device_setup;
5681 /* Everything's ready. Announce the disk to the world. */
5683 rc = device_add(&rbd_dev->dev);
5685 goto err_out_image_lock;
5687 add_disk(rbd_dev->disk);
5688 /* see rbd_init_disk() */
5689 blk_put_queue(rbd_dev->disk->queue);
5691 spin_lock(&rbd_dev_list_lock);
5692 list_add_tail(&rbd_dev->node, &rbd_dev_list);
5693 spin_unlock(&rbd_dev_list_lock);
5695 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
5696 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
5697 rbd_dev->header.features);
5700 module_put(THIS_MODULE);
5704 rbd_dev_image_unlock(rbd_dev);
5705 err_out_device_setup:
5706 rbd_dev_device_release(rbd_dev);
5707 err_out_image_probe:
5708 rbd_dev_image_release(rbd_dev);
5710 rbd_dev_destroy(rbd_dev);
5712 rbd_put_client(rbdc);
5719 static ssize_t rbd_add(struct bus_type *bus,
5726 return do_rbd_add(bus, buf, count);
5729 static ssize_t rbd_add_single_major(struct bus_type *bus,
5733 return do_rbd_add(bus, buf, count);
5736 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5738 while (rbd_dev->parent) {
5739 struct rbd_device *first = rbd_dev;
5740 struct rbd_device *second = first->parent;
5741 struct rbd_device *third;
5744 * Follow to the parent with no grandparent and
5747 while (second && (third = second->parent)) {
5752 rbd_dev_image_release(second);
5753 rbd_dev_destroy(second);
5754 first->parent = NULL;
5755 first->parent_overlap = 0;
5757 rbd_assert(first->parent_spec);
5758 rbd_spec_put(first->parent_spec);
5759 first->parent_spec = NULL;
5763 static ssize_t do_rbd_remove(struct bus_type *bus,
5767 struct rbd_device *rbd_dev = NULL;
5768 struct list_head *tmp;
5771 bool already = false;
5777 sscanf(buf, "%d %5s", &dev_id, opt_buf);
5779 pr_err("dev_id out of range\n");
5782 if (opt_buf[0] != '\0') {
5783 if (!strcmp(opt_buf, "force")) {
5786 pr_err("bad remove option at '%s'\n", opt_buf);
5792 spin_lock(&rbd_dev_list_lock);
5793 list_for_each(tmp, &rbd_dev_list) {
5794 rbd_dev = list_entry(tmp, struct rbd_device, node);
5795 if (rbd_dev->dev_id == dev_id) {
5801 spin_lock_irq(&rbd_dev->lock);
5802 if (rbd_dev->open_count && !force)
5805 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5807 spin_unlock_irq(&rbd_dev->lock);
5809 spin_unlock(&rbd_dev_list_lock);
5810 if (ret < 0 || already)
5815 * Prevent new IO from being queued and wait for existing
5816 * IO to complete/fail.
5818 blk_mq_freeze_queue(rbd_dev->disk->queue);
5819 blk_set_queue_dying(rbd_dev->disk->queue);
5822 del_gendisk(rbd_dev->disk);
5823 spin_lock(&rbd_dev_list_lock);
5824 list_del_init(&rbd_dev->node);
5825 spin_unlock(&rbd_dev_list_lock);
5826 device_del(&rbd_dev->dev);
5828 rbd_dev_image_unlock(rbd_dev);
5829 rbd_dev_device_release(rbd_dev);
5830 rbd_dev_image_release(rbd_dev);
5831 rbd_dev_destroy(rbd_dev);
5835 static ssize_t rbd_remove(struct bus_type *bus,
5842 return do_rbd_remove(bus, buf, count);
5845 static ssize_t rbd_remove_single_major(struct bus_type *bus,
5849 return do_rbd_remove(bus, buf, count);
5853 * create control files in sysfs
5856 static int rbd_sysfs_init(void)
5860 ret = device_register(&rbd_root_dev);
5864 ret = bus_register(&rbd_bus_type);
5866 device_unregister(&rbd_root_dev);
5871 static void rbd_sysfs_cleanup(void)
5873 bus_unregister(&rbd_bus_type);
5874 device_unregister(&rbd_root_dev);
5877 static int rbd_slab_init(void)
5879 rbd_assert(!rbd_img_request_cache);
5880 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
5881 if (!rbd_img_request_cache)
5884 rbd_assert(!rbd_obj_request_cache);
5885 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
5886 if (!rbd_obj_request_cache)
5892 kmem_cache_destroy(rbd_img_request_cache);
5893 rbd_img_request_cache = NULL;
5897 static void rbd_slab_exit(void)
5899 rbd_assert(rbd_obj_request_cache);
5900 kmem_cache_destroy(rbd_obj_request_cache);
5901 rbd_obj_request_cache = NULL;
5903 rbd_assert(rbd_img_request_cache);
5904 kmem_cache_destroy(rbd_img_request_cache);
5905 rbd_img_request_cache = NULL;
5908 static int __init rbd_init(void)
5912 if (!libceph_compatible(NULL)) {
5913 rbd_warn(NULL, "libceph incompatibility (quitting)");
5917 rc = rbd_slab_init();
5922 * The number of active work items is limited by the number of
5923 * rbd devices * queue depth, so leave @max_active at default.
5925 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5932 rbd_major = register_blkdev(0, RBD_DRV_NAME);
5933 if (rbd_major < 0) {
5939 rc = rbd_sysfs_init();
5941 goto err_out_blkdev;
5944 pr_info("loaded (major %d)\n", rbd_major);
5946 pr_info("loaded\n");
5952 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5954 destroy_workqueue(rbd_wq);
5960 static void __exit rbd_exit(void)
5962 ida_destroy(&rbd_dev_id_ida);
5963 rbd_sysfs_cleanup();
5965 unregister_blkdev(rbd_major, RBD_DRV_NAME);
5966 destroy_workqueue(rbd_wq);
5970 module_init(rbd_init);
5971 module_exit(rbd_exit);
5973 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5974 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5975 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5976 /* following authorship retained from original osdblk.c */
5977 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5979 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5980 MODULE_LICENSE("GPL");