3d8342bd6b058ba2e3cbea3723da5f7076eabd56
[linux-2.6-microblaze.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
39
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49
50 #include "rbd_types.h"
51
52 #define RBD_DEBUG       /* Activate rbd_assert() calls */
53
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62         unsigned int counter;
63
64         counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65         if (counter <= (unsigned int)INT_MAX)
66                 return (int)counter;
67
68         atomic_dec(v);
69
70         return -EINVAL;
71 }
72
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76         int counter;
77
78         counter = atomic_dec_return(v);
79         if (counter >= 0)
80                 return counter;
81
82         atomic_inc(v);
83
84         return -EINVAL;
85 }
86
87 #define RBD_DRV_NAME "rbd"
88
89 #define RBD_MINORS_PER_MAJOR            256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
91
92 #define RBD_MAX_PARENT_CHAIN_LEN        16
93
94 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN   \
96                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
99
100 #define RBD_SNAP_HEAD_NAME      "-"
101
102 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
103
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX    64
107
108 #define RBD_OBJ_PREFIX_LEN_MAX  64
109
110 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
111 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING            (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
118 #define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
119 #define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
120 #define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
121 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
122 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
123
124 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
125                                  RBD_FEATURE_STRIPINGV2 |       \
126                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
127                                  RBD_FEATURE_OBJECT_MAP |       \
128                                  RBD_FEATURE_FAST_DIFF |        \
129                                  RBD_FEATURE_DEEP_FLATTEN |     \
130                                  RBD_FEATURE_DATA_POOL |        \
131                                  RBD_FEATURE_OPERATIONS)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190         const char      *pool_ns;       /* NULL if default, never "" */
191
192         const char      *image_id;
193         const char      *image_name;
194
195         u64             snap_id;
196         const char      *snap_name;
197
198         struct kref     kref;
199 };
200
201 /*
202  * an instance of the client.  multiple devices may share an rbd client.
203  */
204 struct rbd_client {
205         struct ceph_client      *client;
206         struct kref             kref;
207         struct list_head        node;
208 };
209
210 struct pending_result {
211         int                     result;         /* first nonzero result */
212         int                     num_pending;
213 };
214
215 struct rbd_img_request;
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA = 1,
219         OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
220         OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
221         OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
222 };
223
224 enum obj_operation_type {
225         OBJ_OP_READ = 1,
226         OBJ_OP_WRITE,
227         OBJ_OP_DISCARD,
228         OBJ_OP_ZEROOUT,
229 };
230
231 #define RBD_OBJ_FLAG_DELETION                   (1U << 0)
232 #define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
233 #define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
234 #define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
236
237 enum rbd_obj_read_state {
238         RBD_OBJ_READ_START = 1,
239         RBD_OBJ_READ_OBJECT,
240         RBD_OBJ_READ_PARENT,
241 };
242
243 /*
244  * Writes go through the following state machine to deal with
245  * layering:
246  *
247  *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
248  *            .                 |                                    .
249  *            .                 v                                    .
250  *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
251  *            .                 |                    .               .
252  *            .                 v                    v (deep-copyup  .
253  *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
254  * flattened) v                 |                    .               .
255  *            .                 v                    .               .
256  *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
257  *                              |                        not needed) v
258  *                              v                                    .
259  *                            done . . . . . . . . . . . . . . . . . .
260  *                              ^
261  *                              |
262  *                     RBD_OBJ_WRITE_FLAT
263  *
264  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
265  * assert_exists guard is needed or not (in some cases it's not needed
266  * even if there is a parent).
267  */
268 enum rbd_obj_write_state {
269         RBD_OBJ_WRITE_START = 1,
270         RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271         RBD_OBJ_WRITE_OBJECT,
272         __RBD_OBJ_WRITE_COPYUP,
273         RBD_OBJ_WRITE_COPYUP,
274         RBD_OBJ_WRITE_POST_OBJECT_MAP,
275 };
276
277 enum rbd_obj_copyup_state {
278         RBD_OBJ_COPYUP_START = 1,
279         RBD_OBJ_COPYUP_READ_PARENT,
280         __RBD_OBJ_COPYUP_OBJECT_MAPS,
281         RBD_OBJ_COPYUP_OBJECT_MAPS,
282         __RBD_OBJ_COPYUP_WRITE_OBJECT,
283         RBD_OBJ_COPYUP_WRITE_OBJECT,
284 };
285
286 struct rbd_obj_request {
287         struct ceph_object_extent ex;
288         unsigned int            flags;  /* RBD_OBJ_FLAG_* */
289         union {
290                 enum rbd_obj_read_state  read_state;    /* for reads */
291                 enum rbd_obj_write_state write_state;   /* for writes */
292         };
293
294         struct rbd_img_request  *img_request;
295         struct ceph_file_extent *img_extents;
296         u32                     num_img_extents;
297
298         union {
299                 struct ceph_bio_iter    bio_pos;
300                 struct {
301                         struct ceph_bvec_iter   bvec_pos;
302                         u32                     bvec_count;
303                         u32                     bvec_idx;
304                 };
305         };
306
307         enum rbd_obj_copyup_state copyup_state;
308         struct bio_vec          *copyup_bvecs;
309         u32                     copyup_bvec_count;
310
311         struct list_head        osd_reqs;       /* w/ r_private_item */
312
313         struct mutex            state_mutex;
314         struct pending_result   pending;
315         struct kref             kref;
316 };
317
318 enum img_req_flags {
319         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
320         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
321 };
322
323 enum rbd_img_state {
324         RBD_IMG_START = 1,
325         RBD_IMG_EXCLUSIVE_LOCK,
326         __RBD_IMG_OBJECT_REQUESTS,
327         RBD_IMG_OBJECT_REQUESTS,
328 };
329
330 struct rbd_img_request {
331         struct rbd_device       *rbd_dev;
332         enum obj_operation_type op_type;
333         enum obj_request_type   data_type;
334         unsigned long           flags;
335         enum rbd_img_state      state;
336         union {
337                 u64                     snap_id;        /* for reads */
338                 struct ceph_snap_context *snapc;        /* for writes */
339         };
340         union {
341                 struct request          *rq;            /* block request */
342                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
343         };
344
345         struct list_head        lock_item;
346         struct list_head        object_extents; /* obj_req.ex structs */
347
348         struct mutex            state_mutex;
349         struct pending_result   pending;
350         struct work_struct      work;
351         int                     work_result;
352         struct kref             kref;
353 };
354
355 #define for_each_obj_request(ireq, oreq) \
356         list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
357 #define for_each_obj_request_safe(ireq, oreq, n) \
358         list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
359
360 enum rbd_watch_state {
361         RBD_WATCH_STATE_UNREGISTERED,
362         RBD_WATCH_STATE_REGISTERED,
363         RBD_WATCH_STATE_ERROR,
364 };
365
366 enum rbd_lock_state {
367         RBD_LOCK_STATE_UNLOCKED,
368         RBD_LOCK_STATE_LOCKED,
369         RBD_LOCK_STATE_RELEASING,
370 };
371
372 /* WatchNotify::ClientId */
373 struct rbd_client_id {
374         u64 gid;
375         u64 handle;
376 };
377
378 struct rbd_mapping {
379         u64                     size;
380 };
381
382 /*
383  * a single device
384  */
385 struct rbd_device {
386         int                     dev_id;         /* blkdev unique id */
387
388         int                     major;          /* blkdev assigned major */
389         int                     minor;
390         struct gendisk          *disk;          /* blkdev's gendisk and rq */
391
392         u32                     image_format;   /* Either 1 or 2 */
393         struct rbd_client       *rbd_client;
394
395         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
396
397         spinlock_t              lock;           /* queue, flags, open_count */
398
399         struct rbd_image_header header;
400         unsigned long           flags;          /* possibly lock protected */
401         struct rbd_spec         *spec;
402         struct rbd_options      *opts;
403         char                    *config_info;   /* add{,_single_major} string */
404
405         struct ceph_object_id   header_oid;
406         struct ceph_object_locator header_oloc;
407
408         struct ceph_file_layout layout;         /* used for all rbd requests */
409
410         struct mutex            watch_mutex;
411         enum rbd_watch_state    watch_state;
412         struct ceph_osd_linger_request *watch_handle;
413         u64                     watch_cookie;
414         struct delayed_work     watch_dwork;
415
416         struct rw_semaphore     lock_rwsem;
417         enum rbd_lock_state     lock_state;
418         char                    lock_cookie[32];
419         struct rbd_client_id    owner_cid;
420         struct work_struct      acquired_lock_work;
421         struct work_struct      released_lock_work;
422         struct delayed_work     lock_dwork;
423         struct work_struct      unlock_work;
424         spinlock_t              lock_lists_lock;
425         struct list_head        acquiring_list;
426         struct list_head        running_list;
427         struct completion       acquire_wait;
428         int                     acquire_err;
429         struct completion       releasing_wait;
430
431         spinlock_t              object_map_lock;
432         u8                      *object_map;
433         u64                     object_map_size;        /* in objects */
434         u64                     object_map_flags;
435
436         struct workqueue_struct *task_wq;
437
438         struct rbd_spec         *parent_spec;
439         u64                     parent_overlap;
440         atomic_t                parent_ref;
441         struct rbd_device       *parent;
442
443         /* Block layer tags. */
444         struct blk_mq_tag_set   tag_set;
445
446         /* protects updating the header */
447         struct rw_semaphore     header_rwsem;
448
449         struct rbd_mapping      mapping;
450
451         struct list_head        node;
452
453         /* sysfs related */
454         struct device           dev;
455         unsigned long           open_count;     /* protected by lock */
456 };
457
458 /*
459  * Flag bits for rbd_dev->flags:
460  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
461  *   by rbd_dev->lock
462  */
463 enum rbd_dev_flags {
464         RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
465         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
466         RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
467 };
468
469 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
470
471 static LIST_HEAD(rbd_dev_list);    /* devices */
472 static DEFINE_SPINLOCK(rbd_dev_list_lock);
473
474 static LIST_HEAD(rbd_client_list);              /* clients */
475 static DEFINE_SPINLOCK(rbd_client_list_lock);
476
477 /* Slab caches for frequently-allocated structures */
478
479 static struct kmem_cache        *rbd_img_request_cache;
480 static struct kmem_cache        *rbd_obj_request_cache;
481
482 static int rbd_major;
483 static DEFINE_IDA(rbd_dev_id_ida);
484
485 static struct workqueue_struct *rbd_wq;
486
487 static struct ceph_snap_context rbd_empty_snapc = {
488         .nref = REFCOUNT_INIT(1),
489 };
490
491 /*
492  * single-major requires >= 0.75 version of userspace rbd utility.
493  */
494 static bool single_major = true;
495 module_param(single_major, bool, 0444);
496 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
497
498 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
499 static ssize_t remove_store(struct bus_type *bus, const char *buf,
500                             size_t count);
501 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
502                                       size_t count);
503 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
504                                          size_t count);
505 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
506
507 static int rbd_dev_id_to_minor(int dev_id)
508 {
509         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
510 }
511
512 static int minor_to_rbd_dev_id(int minor)
513 {
514         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
515 }
516
517 static bool rbd_is_ro(struct rbd_device *rbd_dev)
518 {
519         return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
520 }
521
522 static bool rbd_is_snap(struct rbd_device *rbd_dev)
523 {
524         return rbd_dev->spec->snap_id != CEPH_NOSNAP;
525 }
526
527 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
528 {
529         lockdep_assert_held(&rbd_dev->lock_rwsem);
530
531         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
532                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
533 }
534
535 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
536 {
537         bool is_lock_owner;
538
539         down_read(&rbd_dev->lock_rwsem);
540         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
541         up_read(&rbd_dev->lock_rwsem);
542         return is_lock_owner;
543 }
544
545 static ssize_t supported_features_show(struct bus_type *bus, char *buf)
546 {
547         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
548 }
549
550 static BUS_ATTR_WO(add);
551 static BUS_ATTR_WO(remove);
552 static BUS_ATTR_WO(add_single_major);
553 static BUS_ATTR_WO(remove_single_major);
554 static BUS_ATTR_RO(supported_features);
555
556 static struct attribute *rbd_bus_attrs[] = {
557         &bus_attr_add.attr,
558         &bus_attr_remove.attr,
559         &bus_attr_add_single_major.attr,
560         &bus_attr_remove_single_major.attr,
561         &bus_attr_supported_features.attr,
562         NULL,
563 };
564
565 static umode_t rbd_bus_is_visible(struct kobject *kobj,
566                                   struct attribute *attr, int index)
567 {
568         if (!single_major &&
569             (attr == &bus_attr_add_single_major.attr ||
570              attr == &bus_attr_remove_single_major.attr))
571                 return 0;
572
573         return attr->mode;
574 }
575
576 static const struct attribute_group rbd_bus_group = {
577         .attrs = rbd_bus_attrs,
578         .is_visible = rbd_bus_is_visible,
579 };
580 __ATTRIBUTE_GROUPS(rbd_bus);
581
582 static struct bus_type rbd_bus_type = {
583         .name           = "rbd",
584         .bus_groups     = rbd_bus_groups,
585 };
586
587 static void rbd_root_dev_release(struct device *dev)
588 {
589 }
590
591 static struct device rbd_root_dev = {
592         .init_name =    "rbd",
593         .release =      rbd_root_dev_release,
594 };
595
596 static __printf(2, 3)
597 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
598 {
599         struct va_format vaf;
600         va_list args;
601
602         va_start(args, fmt);
603         vaf.fmt = fmt;
604         vaf.va = &args;
605
606         if (!rbd_dev)
607                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
608         else if (rbd_dev->disk)
609                 printk(KERN_WARNING "%s: %s: %pV\n",
610                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
611         else if (rbd_dev->spec && rbd_dev->spec->image_name)
612                 printk(KERN_WARNING "%s: image %s: %pV\n",
613                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
614         else if (rbd_dev->spec && rbd_dev->spec->image_id)
615                 printk(KERN_WARNING "%s: id %s: %pV\n",
616                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
617         else    /* punt */
618                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
619                         RBD_DRV_NAME, rbd_dev, &vaf);
620         va_end(args);
621 }
622
623 #ifdef RBD_DEBUG
624 #define rbd_assert(expr)                                                \
625                 if (unlikely(!(expr))) {                                \
626                         printk(KERN_ERR "\nAssertion failure in %s() "  \
627                                                 "at line %d:\n\n"       \
628                                         "\trbd_assert(%s);\n\n",        \
629                                         __func__, __LINE__, #expr);     \
630                         BUG();                                          \
631                 }
632 #else /* !RBD_DEBUG */
633 #  define rbd_assert(expr)      ((void) 0)
634 #endif /* !RBD_DEBUG */
635
636 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
637
638 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
639 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
640 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
641 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
642 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
643                                         u64 snap_id);
644 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
645                                 u8 *order, u64 *snap_size);
646 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
647
648 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
649 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
650
651 /*
652  * Return true if nothing else is pending.
653  */
654 static bool pending_result_dec(struct pending_result *pending, int *result)
655 {
656         rbd_assert(pending->num_pending > 0);
657
658         if (*result && !pending->result)
659                 pending->result = *result;
660         if (--pending->num_pending)
661                 return false;
662
663         *result = pending->result;
664         return true;
665 }
666
667 static int rbd_open(struct block_device *bdev, fmode_t mode)
668 {
669         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
670         bool removing = false;
671
672         spin_lock_irq(&rbd_dev->lock);
673         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
674                 removing = true;
675         else
676                 rbd_dev->open_count++;
677         spin_unlock_irq(&rbd_dev->lock);
678         if (removing)
679                 return -ENOENT;
680
681         (void) get_device(&rbd_dev->dev);
682
683         return 0;
684 }
685
686 static void rbd_release(struct gendisk *disk, fmode_t mode)
687 {
688         struct rbd_device *rbd_dev = disk->private_data;
689         unsigned long open_count_before;
690
691         spin_lock_irq(&rbd_dev->lock);
692         open_count_before = rbd_dev->open_count--;
693         spin_unlock_irq(&rbd_dev->lock);
694         rbd_assert(open_count_before > 0);
695
696         put_device(&rbd_dev->dev);
697 }
698
699 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
700 {
701         int ro;
702
703         if (get_user(ro, (int __user *)arg))
704                 return -EFAULT;
705
706         /*
707          * Both images mapped read-only and snapshots can't be marked
708          * read-write.
709          */
710         if (!ro) {
711                 if (rbd_is_ro(rbd_dev))
712                         return -EROFS;
713
714                 rbd_assert(!rbd_is_snap(rbd_dev));
715         }
716
717         /* Let blkdev_roset() handle it */
718         return -ENOTTY;
719 }
720
721 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
722                         unsigned int cmd, unsigned long arg)
723 {
724         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
725         int ret;
726
727         switch (cmd) {
728         case BLKROSET:
729                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
730                 break;
731         default:
732                 ret = -ENOTTY;
733         }
734
735         return ret;
736 }
737
738 #ifdef CONFIG_COMPAT
739 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
740                                 unsigned int cmd, unsigned long arg)
741 {
742         return rbd_ioctl(bdev, mode, cmd, arg);
743 }
744 #endif /* CONFIG_COMPAT */
745
746 static const struct block_device_operations rbd_bd_ops = {
747         .owner                  = THIS_MODULE,
748         .open                   = rbd_open,
749         .release                = rbd_release,
750         .ioctl                  = rbd_ioctl,
751 #ifdef CONFIG_COMPAT
752         .compat_ioctl           = rbd_compat_ioctl,
753 #endif
754 };
755
756 /*
757  * Initialize an rbd client instance.  Success or not, this function
758  * consumes ceph_opts.  Caller holds client_mutex.
759  */
760 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
761 {
762         struct rbd_client *rbdc;
763         int ret = -ENOMEM;
764
765         dout("%s:\n", __func__);
766         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
767         if (!rbdc)
768                 goto out_opt;
769
770         kref_init(&rbdc->kref);
771         INIT_LIST_HEAD(&rbdc->node);
772
773         rbdc->client = ceph_create_client(ceph_opts, rbdc);
774         if (IS_ERR(rbdc->client))
775                 goto out_rbdc;
776         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
777
778         ret = ceph_open_session(rbdc->client);
779         if (ret < 0)
780                 goto out_client;
781
782         spin_lock(&rbd_client_list_lock);
783         list_add_tail(&rbdc->node, &rbd_client_list);
784         spin_unlock(&rbd_client_list_lock);
785
786         dout("%s: rbdc %p\n", __func__, rbdc);
787
788         return rbdc;
789 out_client:
790         ceph_destroy_client(rbdc->client);
791 out_rbdc:
792         kfree(rbdc);
793 out_opt:
794         if (ceph_opts)
795                 ceph_destroy_options(ceph_opts);
796         dout("%s: error %d\n", __func__, ret);
797
798         return ERR_PTR(ret);
799 }
800
801 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
802 {
803         kref_get(&rbdc->kref);
804
805         return rbdc;
806 }
807
808 /*
809  * Find a ceph client with specific addr and configuration.  If
810  * found, bump its reference count.
811  */
812 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
813 {
814         struct rbd_client *client_node;
815         bool found = false;
816
817         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
818                 return NULL;
819
820         spin_lock(&rbd_client_list_lock);
821         list_for_each_entry(client_node, &rbd_client_list, node) {
822                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
823                         __rbd_get_client(client_node);
824
825                         found = true;
826                         break;
827                 }
828         }
829         spin_unlock(&rbd_client_list_lock);
830
831         return found ? client_node : NULL;
832 }
833
834 /*
835  * (Per device) rbd map options
836  */
837 enum {
838         Opt_queue_depth,
839         Opt_alloc_size,
840         Opt_lock_timeout,
841         Opt_last_int,
842         /* int args above */
843         Opt_pool_ns,
844         Opt_last_string,
845         /* string args above */
846         Opt_read_only,
847         Opt_read_write,
848         Opt_lock_on_read,
849         Opt_exclusive,
850         Opt_notrim,
851         Opt_err
852 };
853
854 static match_table_t rbd_opts_tokens = {
855         {Opt_queue_depth, "queue_depth=%d"},
856         {Opt_alloc_size, "alloc_size=%d"},
857         {Opt_lock_timeout, "lock_timeout=%d"},
858         /* int args above */
859         {Opt_pool_ns, "_pool_ns=%s"},
860         /* string args above */
861         {Opt_read_only, "read_only"},
862         {Opt_read_only, "ro"},          /* Alternate spelling */
863         {Opt_read_write, "read_write"},
864         {Opt_read_write, "rw"},         /* Alternate spelling */
865         {Opt_lock_on_read, "lock_on_read"},
866         {Opt_exclusive, "exclusive"},
867         {Opt_notrim, "notrim"},
868         {Opt_err, NULL}
869 };
870
871 struct rbd_options {
872         int     queue_depth;
873         int     alloc_size;
874         unsigned long   lock_timeout;
875         bool    read_only;
876         bool    lock_on_read;
877         bool    exclusive;
878         bool    trim;
879 };
880
881 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
882 #define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
883 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
884 #define RBD_READ_ONLY_DEFAULT   false
885 #define RBD_LOCK_ON_READ_DEFAULT false
886 #define RBD_EXCLUSIVE_DEFAULT   false
887 #define RBD_TRIM_DEFAULT        true
888
889 struct parse_rbd_opts_ctx {
890         struct rbd_spec         *spec;
891         struct rbd_options      *opts;
892 };
893
894 static int parse_rbd_opts_token(char *c, void *private)
895 {
896         struct parse_rbd_opts_ctx *pctx = private;
897         substring_t argstr[MAX_OPT_ARGS];
898         int token, intval, ret;
899
900         token = match_token(c, rbd_opts_tokens, argstr);
901         if (token < Opt_last_int) {
902                 ret = match_int(&argstr[0], &intval);
903                 if (ret < 0) {
904                         pr_err("bad option arg (not int) at '%s'\n", c);
905                         return ret;
906                 }
907                 dout("got int token %d val %d\n", token, intval);
908         } else if (token > Opt_last_int && token < Opt_last_string) {
909                 dout("got string token %d val %s\n", token, argstr[0].from);
910         } else {
911                 dout("got token %d\n", token);
912         }
913
914         switch (token) {
915         case Opt_queue_depth:
916                 if (intval < 1) {
917                         pr_err("queue_depth out of range\n");
918                         return -EINVAL;
919                 }
920                 pctx->opts->queue_depth = intval;
921                 break;
922         case Opt_alloc_size:
923                 if (intval < SECTOR_SIZE) {
924                         pr_err("alloc_size out of range\n");
925                         return -EINVAL;
926                 }
927                 if (!is_power_of_2(intval)) {
928                         pr_err("alloc_size must be a power of 2\n");
929                         return -EINVAL;
930                 }
931                 pctx->opts->alloc_size = intval;
932                 break;
933         case Opt_lock_timeout:
934                 /* 0 is "wait forever" (i.e. infinite timeout) */
935                 if (intval < 0 || intval > INT_MAX / 1000) {
936                         pr_err("lock_timeout out of range\n");
937                         return -EINVAL;
938                 }
939                 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
940                 break;
941         case Opt_pool_ns:
942                 kfree(pctx->spec->pool_ns);
943                 pctx->spec->pool_ns = match_strdup(argstr);
944                 if (!pctx->spec->pool_ns)
945                         return -ENOMEM;
946                 break;
947         case Opt_read_only:
948                 pctx->opts->read_only = true;
949                 break;
950         case Opt_read_write:
951                 pctx->opts->read_only = false;
952                 break;
953         case Opt_lock_on_read:
954                 pctx->opts->lock_on_read = true;
955                 break;
956         case Opt_exclusive:
957                 pctx->opts->exclusive = true;
958                 break;
959         case Opt_notrim:
960                 pctx->opts->trim = false;
961                 break;
962         default:
963                 /* libceph prints "bad option" msg */
964                 return -EINVAL;
965         }
966
967         return 0;
968 }
969
970 static char* obj_op_name(enum obj_operation_type op_type)
971 {
972         switch (op_type) {
973         case OBJ_OP_READ:
974                 return "read";
975         case OBJ_OP_WRITE:
976                 return "write";
977         case OBJ_OP_DISCARD:
978                 return "discard";
979         case OBJ_OP_ZEROOUT:
980                 return "zeroout";
981         default:
982                 return "???";
983         }
984 }
985
986 /*
987  * Destroy ceph client
988  *
989  * Caller must hold rbd_client_list_lock.
990  */
991 static void rbd_client_release(struct kref *kref)
992 {
993         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
994
995         dout("%s: rbdc %p\n", __func__, rbdc);
996         spin_lock(&rbd_client_list_lock);
997         list_del(&rbdc->node);
998         spin_unlock(&rbd_client_list_lock);
999
1000         ceph_destroy_client(rbdc->client);
1001         kfree(rbdc);
1002 }
1003
1004 /*
1005  * Drop reference to ceph client node. If it's not referenced anymore, release
1006  * it.
1007  */
1008 static void rbd_put_client(struct rbd_client *rbdc)
1009 {
1010         if (rbdc)
1011                 kref_put(&rbdc->kref, rbd_client_release);
1012 }
1013
1014 /*
1015  * Get a ceph client with specific addr and configuration, if one does
1016  * not exist create it.  Either way, ceph_opts is consumed by this
1017  * function.
1018  */
1019 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
1020 {
1021         struct rbd_client *rbdc;
1022         int ret;
1023
1024         mutex_lock(&client_mutex);
1025         rbdc = rbd_client_find(ceph_opts);
1026         if (rbdc) {
1027                 ceph_destroy_options(ceph_opts);
1028
1029                 /*
1030                  * Using an existing client.  Make sure ->pg_pools is up to
1031                  * date before we look up the pool id in do_rbd_add().
1032                  */
1033                 ret = ceph_wait_for_latest_osdmap(rbdc->client,
1034                                         rbdc->client->options->mount_timeout);
1035                 if (ret) {
1036                         rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
1037                         rbd_put_client(rbdc);
1038                         rbdc = ERR_PTR(ret);
1039                 }
1040         } else {
1041                 rbdc = rbd_client_create(ceph_opts);
1042         }
1043         mutex_unlock(&client_mutex);
1044
1045         return rbdc;
1046 }
1047
1048 static bool rbd_image_format_valid(u32 image_format)
1049 {
1050         return image_format == 1 || image_format == 2;
1051 }
1052
1053 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
1054 {
1055         size_t size;
1056         u32 snap_count;
1057
1058         /* The header has to start with the magic rbd header text */
1059         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
1060                 return false;
1061
1062         /* The bio layer requires at least sector-sized I/O */
1063
1064         if (ondisk->options.order < SECTOR_SHIFT)
1065                 return false;
1066
1067         /* If we use u64 in a few spots we may be able to loosen this */
1068
1069         if (ondisk->options.order > 8 * sizeof (int) - 1)
1070                 return false;
1071
1072         /*
1073          * The size of a snapshot header has to fit in a size_t, and
1074          * that limits the number of snapshots.
1075          */
1076         snap_count = le32_to_cpu(ondisk->snap_count);
1077         size = SIZE_MAX - sizeof (struct ceph_snap_context);
1078         if (snap_count > size / sizeof (__le64))
1079                 return false;
1080
1081         /*
1082          * Not only that, but the size of the entire the snapshot
1083          * header must also be representable in a size_t.
1084          */
1085         size -= snap_count * sizeof (__le64);
1086         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
1087                 return false;
1088
1089         return true;
1090 }
1091
1092 /*
1093  * returns the size of an object in the image
1094  */
1095 static u32 rbd_obj_bytes(struct rbd_image_header *header)
1096 {
1097         return 1U << header->obj_order;
1098 }
1099
1100 static void rbd_init_layout(struct rbd_device *rbd_dev)
1101 {
1102         if (rbd_dev->header.stripe_unit == 0 ||
1103             rbd_dev->header.stripe_count == 0) {
1104                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1105                 rbd_dev->header.stripe_count = 1;
1106         }
1107
1108         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1109         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1110         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1111         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1112                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1113         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1114 }
1115
1116 /*
1117  * Fill an rbd image header with information from the given format 1
1118  * on-disk header.
1119  */
1120 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1121                                  struct rbd_image_header_ondisk *ondisk)
1122 {
1123         struct rbd_image_header *header = &rbd_dev->header;
1124         bool first_time = header->object_prefix == NULL;
1125         struct ceph_snap_context *snapc;
1126         char *object_prefix = NULL;
1127         char *snap_names = NULL;
1128         u64 *snap_sizes = NULL;
1129         u32 snap_count;
1130         int ret = -ENOMEM;
1131         u32 i;
1132
1133         /* Allocate this now to avoid having to handle failure below */
1134
1135         if (first_time) {
1136                 object_prefix = kstrndup(ondisk->object_prefix,
1137                                          sizeof(ondisk->object_prefix),
1138                                          GFP_KERNEL);
1139                 if (!object_prefix)
1140                         return -ENOMEM;
1141         }
1142
1143         /* Allocate the snapshot context and fill it in */
1144
1145         snap_count = le32_to_cpu(ondisk->snap_count);
1146         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1147         if (!snapc)
1148                 goto out_err;
1149         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1150         if (snap_count) {
1151                 struct rbd_image_snap_ondisk *snaps;
1152                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1153
1154                 /* We'll keep a copy of the snapshot names... */
1155
1156                 if (snap_names_len > (u64)SIZE_MAX)
1157                         goto out_2big;
1158                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1159                 if (!snap_names)
1160                         goto out_err;
1161
1162                 /* ...as well as the array of their sizes. */
1163                 snap_sizes = kmalloc_array(snap_count,
1164                                            sizeof(*header->snap_sizes),
1165                                            GFP_KERNEL);
1166                 if (!snap_sizes)
1167                         goto out_err;
1168
1169                 /*
1170                  * Copy the names, and fill in each snapshot's id
1171                  * and size.
1172                  *
1173                  * Note that rbd_dev_v1_header_info() guarantees the
1174                  * ondisk buffer we're working with has
1175                  * snap_names_len bytes beyond the end of the
1176                  * snapshot id array, this memcpy() is safe.
1177                  */
1178                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1179                 snaps = ondisk->snaps;
1180                 for (i = 0; i < snap_count; i++) {
1181                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1182                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1183                 }
1184         }
1185
1186         /* We won't fail any more, fill in the header */
1187
1188         if (first_time) {
1189                 header->object_prefix = object_prefix;
1190                 header->obj_order = ondisk->options.order;
1191                 rbd_init_layout(rbd_dev);
1192         } else {
1193                 ceph_put_snap_context(header->snapc);
1194                 kfree(header->snap_names);
1195                 kfree(header->snap_sizes);
1196         }
1197
1198         /* The remaining fields always get updated (when we refresh) */
1199
1200         header->image_size = le64_to_cpu(ondisk->image_size);
1201         header->snapc = snapc;
1202         header->snap_names = snap_names;
1203         header->snap_sizes = snap_sizes;
1204
1205         return 0;
1206 out_2big:
1207         ret = -EIO;
1208 out_err:
1209         kfree(snap_sizes);
1210         kfree(snap_names);
1211         ceph_put_snap_context(snapc);
1212         kfree(object_prefix);
1213
1214         return ret;
1215 }
1216
1217 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1218 {
1219         const char *snap_name;
1220
1221         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1222
1223         /* Skip over names until we find the one we are looking for */
1224
1225         snap_name = rbd_dev->header.snap_names;
1226         while (which--)
1227                 snap_name += strlen(snap_name) + 1;
1228
1229         return kstrdup(snap_name, GFP_KERNEL);
1230 }
1231
1232 /*
1233  * Snapshot id comparison function for use with qsort()/bsearch().
1234  * Note that result is for snapshots in *descending* order.
1235  */
1236 static int snapid_compare_reverse(const void *s1, const void *s2)
1237 {
1238         u64 snap_id1 = *(u64 *)s1;
1239         u64 snap_id2 = *(u64 *)s2;
1240
1241         if (snap_id1 < snap_id2)
1242                 return 1;
1243         return snap_id1 == snap_id2 ? 0 : -1;
1244 }
1245
1246 /*
1247  * Search a snapshot context to see if the given snapshot id is
1248  * present.
1249  *
1250  * Returns the position of the snapshot id in the array if it's found,
1251  * or BAD_SNAP_INDEX otherwise.
1252  *
1253  * Note: The snapshot array is in kept sorted (by the osd) in
1254  * reverse order, highest snapshot id first.
1255  */
1256 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1257 {
1258         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1259         u64 *found;
1260
1261         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1262                                 sizeof (snap_id), snapid_compare_reverse);
1263
1264         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1265 }
1266
1267 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1268                                         u64 snap_id)
1269 {
1270         u32 which;
1271         const char *snap_name;
1272
1273         which = rbd_dev_snap_index(rbd_dev, snap_id);
1274         if (which == BAD_SNAP_INDEX)
1275                 return ERR_PTR(-ENOENT);
1276
1277         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1278         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1279 }
1280
1281 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1282 {
1283         if (snap_id == CEPH_NOSNAP)
1284                 return RBD_SNAP_HEAD_NAME;
1285
1286         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1287         if (rbd_dev->image_format == 1)
1288                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1289
1290         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1291 }
1292
1293 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1294                                 u64 *snap_size)
1295 {
1296         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1297         if (snap_id == CEPH_NOSNAP) {
1298                 *snap_size = rbd_dev->header.image_size;
1299         } else if (rbd_dev->image_format == 1) {
1300                 u32 which;
1301
1302                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1303                 if (which == BAD_SNAP_INDEX)
1304                         return -ENOENT;
1305
1306                 *snap_size = rbd_dev->header.snap_sizes[which];
1307         } else {
1308                 u64 size = 0;
1309                 int ret;
1310
1311                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1312                 if (ret)
1313                         return ret;
1314
1315                 *snap_size = size;
1316         }
1317         return 0;
1318 }
1319
1320 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1321 {
1322         u64 snap_id = rbd_dev->spec->snap_id;
1323         u64 size = 0;
1324         int ret;
1325
1326         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1327         if (ret)
1328                 return ret;
1329
1330         rbd_dev->mapping.size = size;
1331         return 0;
1332 }
1333
1334 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1335 {
1336         rbd_dev->mapping.size = 0;
1337 }
1338
1339 static void zero_bvec(struct bio_vec *bv)
1340 {
1341         void *buf;
1342         unsigned long flags;
1343
1344         buf = bvec_kmap_irq(bv, &flags);
1345         memset(buf, 0, bv->bv_len);
1346         flush_dcache_page(bv->bv_page);
1347         bvec_kunmap_irq(buf, &flags);
1348 }
1349
1350 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1351 {
1352         struct ceph_bio_iter it = *bio_pos;
1353
1354         ceph_bio_iter_advance(&it, off);
1355         ceph_bio_iter_advance_step(&it, bytes, ({
1356                 zero_bvec(&bv);
1357         }));
1358 }
1359
1360 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1361 {
1362         struct ceph_bvec_iter it = *bvec_pos;
1363
1364         ceph_bvec_iter_advance(&it, off);
1365         ceph_bvec_iter_advance_step(&it, bytes, ({
1366                 zero_bvec(&bv);
1367         }));
1368 }
1369
1370 /*
1371  * Zero a range in @obj_req data buffer defined by a bio (list) or
1372  * (private) bio_vec array.
1373  *
1374  * @off is relative to the start of the data buffer.
1375  */
1376 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1377                                u32 bytes)
1378 {
1379         dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1380
1381         switch (obj_req->img_request->data_type) {
1382         case OBJ_REQUEST_BIO:
1383                 zero_bios(&obj_req->bio_pos, off, bytes);
1384                 break;
1385         case OBJ_REQUEST_BVECS:
1386         case OBJ_REQUEST_OWN_BVECS:
1387                 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1388                 break;
1389         default:
1390                 BUG();
1391         }
1392 }
1393
1394 static void rbd_obj_request_destroy(struct kref *kref);
1395 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1396 {
1397         rbd_assert(obj_request != NULL);
1398         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1399                 kref_read(&obj_request->kref));
1400         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1401 }
1402
1403 static void rbd_img_request_destroy(struct kref *kref);
1404 static void rbd_img_request_put(struct rbd_img_request *img_request)
1405 {
1406         rbd_assert(img_request != NULL);
1407         dout("%s: img %p (was %d)\n", __func__, img_request,
1408                 kref_read(&img_request->kref));
1409         kref_put(&img_request->kref, rbd_img_request_destroy);
1410 }
1411
1412 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1413                                         struct rbd_obj_request *obj_request)
1414 {
1415         rbd_assert(obj_request->img_request == NULL);
1416
1417         /* Image request now owns object's original reference */
1418         obj_request->img_request = img_request;
1419         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1420 }
1421
1422 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1423                                         struct rbd_obj_request *obj_request)
1424 {
1425         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1426         list_del(&obj_request->ex.oe_item);
1427         rbd_assert(obj_request->img_request == img_request);
1428         rbd_obj_request_put(obj_request);
1429 }
1430
1431 static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1432 {
1433         struct rbd_obj_request *obj_req = osd_req->r_priv;
1434
1435         dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1436              __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1437              obj_req->ex.oe_off, obj_req->ex.oe_len);
1438         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1439 }
1440
1441 /*
1442  * The default/initial value for all image request flags is 0.  Each
1443  * is conditionally set to 1 at image request initialization time
1444  * and currently never change thereafter.
1445  */
1446 static void img_request_layered_set(struct rbd_img_request *img_request)
1447 {
1448         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1449         smp_mb();
1450 }
1451
1452 static void img_request_layered_clear(struct rbd_img_request *img_request)
1453 {
1454         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1455         smp_mb();
1456 }
1457
1458 static bool img_request_layered_test(struct rbd_img_request *img_request)
1459 {
1460         smp_mb();
1461         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1462 }
1463
1464 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1465 {
1466         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1467
1468         return !obj_req->ex.oe_off &&
1469                obj_req->ex.oe_len == rbd_dev->layout.object_size;
1470 }
1471
1472 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1473 {
1474         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1475
1476         return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1477                                         rbd_dev->layout.object_size;
1478 }
1479
1480 /*
1481  * Must be called after rbd_obj_calc_img_extents().
1482  */
1483 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1484 {
1485         if (!obj_req->num_img_extents ||
1486             (rbd_obj_is_entire(obj_req) &&
1487              !obj_req->img_request->snapc->num_snaps))
1488                 return false;
1489
1490         return true;
1491 }
1492
1493 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1494 {
1495         return ceph_file_extents_bytes(obj_req->img_extents,
1496                                        obj_req->num_img_extents);
1497 }
1498
1499 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1500 {
1501         switch (img_req->op_type) {
1502         case OBJ_OP_READ:
1503                 return false;
1504         case OBJ_OP_WRITE:
1505         case OBJ_OP_DISCARD:
1506         case OBJ_OP_ZEROOUT:
1507                 return true;
1508         default:
1509                 BUG();
1510         }
1511 }
1512
1513 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1514 {
1515         struct rbd_obj_request *obj_req = osd_req->r_priv;
1516         int result;
1517
1518         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1519              osd_req->r_result, obj_req);
1520
1521         /*
1522          * Writes aren't allowed to return a data payload.  In some
1523          * guarded write cases (e.g. stat + zero on an empty object)
1524          * a stat response makes it through, but we don't care.
1525          */
1526         if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1527                 result = 0;
1528         else
1529                 result = osd_req->r_result;
1530
1531         rbd_obj_handle_request(obj_req, result);
1532 }
1533
1534 static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1535 {
1536         struct rbd_obj_request *obj_request = osd_req->r_priv;
1537
1538         osd_req->r_flags = CEPH_OSD_FLAG_READ;
1539         osd_req->r_snapid = obj_request->img_request->snap_id;
1540 }
1541
1542 static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1543 {
1544         struct rbd_obj_request *obj_request = osd_req->r_priv;
1545
1546         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1547         ktime_get_real_ts64(&osd_req->r_mtime);
1548         osd_req->r_data_offset = obj_request->ex.oe_off;
1549 }
1550
1551 static struct ceph_osd_request *
1552 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1553                           struct ceph_snap_context *snapc, int num_ops)
1554 {
1555         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1556         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1557         struct ceph_osd_request *req;
1558         const char *name_format = rbd_dev->image_format == 1 ?
1559                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1560         int ret;
1561
1562         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1563         if (!req)
1564                 return ERR_PTR(-ENOMEM);
1565
1566         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1567         req->r_callback = rbd_osd_req_callback;
1568         req->r_priv = obj_req;
1569
1570         /*
1571          * Data objects may be stored in a separate pool, but always in
1572          * the same namespace in that pool as the header in its pool.
1573          */
1574         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1575         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1576
1577         ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1578                                rbd_dev->header.object_prefix,
1579                                obj_req->ex.oe_objno);
1580         if (ret)
1581                 return ERR_PTR(ret);
1582
1583         return req;
1584 }
1585
1586 static struct ceph_osd_request *
1587 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1588 {
1589         return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1590                                          num_ops);
1591 }
1592
1593 static struct rbd_obj_request *rbd_obj_request_create(void)
1594 {
1595         struct rbd_obj_request *obj_request;
1596
1597         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1598         if (!obj_request)
1599                 return NULL;
1600
1601         ceph_object_extent_init(&obj_request->ex);
1602         INIT_LIST_HEAD(&obj_request->osd_reqs);
1603         mutex_init(&obj_request->state_mutex);
1604         kref_init(&obj_request->kref);
1605
1606         dout("%s %p\n", __func__, obj_request);
1607         return obj_request;
1608 }
1609
1610 static void rbd_obj_request_destroy(struct kref *kref)
1611 {
1612         struct rbd_obj_request *obj_request;
1613         struct ceph_osd_request *osd_req;
1614         u32 i;
1615
1616         obj_request = container_of(kref, struct rbd_obj_request, kref);
1617
1618         dout("%s: obj %p\n", __func__, obj_request);
1619
1620         while (!list_empty(&obj_request->osd_reqs)) {
1621                 osd_req = list_first_entry(&obj_request->osd_reqs,
1622                                     struct ceph_osd_request, r_private_item);
1623                 list_del_init(&osd_req->r_private_item);
1624                 ceph_osdc_put_request(osd_req);
1625         }
1626
1627         switch (obj_request->img_request->data_type) {
1628         case OBJ_REQUEST_NODATA:
1629         case OBJ_REQUEST_BIO:
1630         case OBJ_REQUEST_BVECS:
1631                 break;          /* Nothing to do */
1632         case OBJ_REQUEST_OWN_BVECS:
1633                 kfree(obj_request->bvec_pos.bvecs);
1634                 break;
1635         default:
1636                 BUG();
1637         }
1638
1639         kfree(obj_request->img_extents);
1640         if (obj_request->copyup_bvecs) {
1641                 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1642                         if (obj_request->copyup_bvecs[i].bv_page)
1643                                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1644                 }
1645                 kfree(obj_request->copyup_bvecs);
1646         }
1647
1648         kmem_cache_free(rbd_obj_request_cache, obj_request);
1649 }
1650
1651 /* It's OK to call this for a device with no parent */
1652
1653 static void rbd_spec_put(struct rbd_spec *spec);
1654 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1655 {
1656         rbd_dev_remove_parent(rbd_dev);
1657         rbd_spec_put(rbd_dev->parent_spec);
1658         rbd_dev->parent_spec = NULL;
1659         rbd_dev->parent_overlap = 0;
1660 }
1661
1662 /*
1663  * Parent image reference counting is used to determine when an
1664  * image's parent fields can be safely torn down--after there are no
1665  * more in-flight requests to the parent image.  When the last
1666  * reference is dropped, cleaning them up is safe.
1667  */
1668 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1669 {
1670         int counter;
1671
1672         if (!rbd_dev->parent_spec)
1673                 return;
1674
1675         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1676         if (counter > 0)
1677                 return;
1678
1679         /* Last reference; clean up parent data structures */
1680
1681         if (!counter)
1682                 rbd_dev_unparent(rbd_dev);
1683         else
1684                 rbd_warn(rbd_dev, "parent reference underflow");
1685 }
1686
1687 /*
1688  * If an image has a non-zero parent overlap, get a reference to its
1689  * parent.
1690  *
1691  * Returns true if the rbd device has a parent with a non-zero
1692  * overlap and a reference for it was successfully taken, or
1693  * false otherwise.
1694  */
1695 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1696 {
1697         int counter = 0;
1698
1699         if (!rbd_dev->parent_spec)
1700                 return false;
1701
1702         down_read(&rbd_dev->header_rwsem);
1703         if (rbd_dev->parent_overlap)
1704                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1705         up_read(&rbd_dev->header_rwsem);
1706
1707         if (counter < 0)
1708                 rbd_warn(rbd_dev, "parent reference overflow");
1709
1710         return counter > 0;
1711 }
1712
1713 /*
1714  * Caller is responsible for filling in the list of object requests
1715  * that comprises the image request, and the Linux request pointer
1716  * (if there is one).
1717  */
1718 static struct rbd_img_request *rbd_img_request_create(
1719                                         struct rbd_device *rbd_dev,
1720                                         enum obj_operation_type op_type,
1721                                         struct ceph_snap_context *snapc)
1722 {
1723         struct rbd_img_request *img_request;
1724
1725         img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1726         if (!img_request)
1727                 return NULL;
1728
1729         img_request->rbd_dev = rbd_dev;
1730         img_request->op_type = op_type;
1731         if (!rbd_img_is_write(img_request))
1732                 img_request->snap_id = rbd_dev->spec->snap_id;
1733         else
1734                 img_request->snapc = snapc;
1735
1736         if (rbd_dev_parent_get(rbd_dev))
1737                 img_request_layered_set(img_request);
1738
1739         INIT_LIST_HEAD(&img_request->lock_item);
1740         INIT_LIST_HEAD(&img_request->object_extents);
1741         mutex_init(&img_request->state_mutex);
1742         kref_init(&img_request->kref);
1743
1744         return img_request;
1745 }
1746
1747 static void rbd_img_request_destroy(struct kref *kref)
1748 {
1749         struct rbd_img_request *img_request;
1750         struct rbd_obj_request *obj_request;
1751         struct rbd_obj_request *next_obj_request;
1752
1753         img_request = container_of(kref, struct rbd_img_request, kref);
1754
1755         dout("%s: img %p\n", __func__, img_request);
1756
1757         WARN_ON(!list_empty(&img_request->lock_item));
1758         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1759                 rbd_img_obj_request_del(img_request, obj_request);
1760
1761         if (img_request_layered_test(img_request)) {
1762                 img_request_layered_clear(img_request);
1763                 rbd_dev_parent_put(img_request->rbd_dev);
1764         }
1765
1766         if (rbd_img_is_write(img_request))
1767                 ceph_put_snap_context(img_request->snapc);
1768
1769         kmem_cache_free(rbd_img_request_cache, img_request);
1770 }
1771
1772 #define BITS_PER_OBJ    2
1773 #define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1774 #define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1775
1776 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1777                                    u64 *index, u8 *shift)
1778 {
1779         u32 off;
1780
1781         rbd_assert(objno < rbd_dev->object_map_size);
1782         *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1783         *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1784 }
1785
1786 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1787 {
1788         u64 index;
1789         u8 shift;
1790
1791         lockdep_assert_held(&rbd_dev->object_map_lock);
1792         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1793         return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1794 }
1795
1796 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1797 {
1798         u64 index;
1799         u8 shift;
1800         u8 *p;
1801
1802         lockdep_assert_held(&rbd_dev->object_map_lock);
1803         rbd_assert(!(val & ~OBJ_MASK));
1804
1805         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1806         p = &rbd_dev->object_map[index];
1807         *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1808 }
1809
1810 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1811 {
1812         u8 state;
1813
1814         spin_lock(&rbd_dev->object_map_lock);
1815         state = __rbd_object_map_get(rbd_dev, objno);
1816         spin_unlock(&rbd_dev->object_map_lock);
1817         return state;
1818 }
1819
1820 static bool use_object_map(struct rbd_device *rbd_dev)
1821 {
1822         /*
1823          * An image mapped read-only can't use the object map -- it isn't
1824          * loaded because the header lock isn't acquired.  Someone else can
1825          * write to the image and update the object map behind our back.
1826          *
1827          * A snapshot can't be written to, so using the object map is always
1828          * safe.
1829          */
1830         if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1831                 return false;
1832
1833         return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1834                 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1835 }
1836
1837 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1838 {
1839         u8 state;
1840
1841         /* fall back to default logic if object map is disabled or invalid */
1842         if (!use_object_map(rbd_dev))
1843                 return true;
1844
1845         state = rbd_object_map_get(rbd_dev, objno);
1846         return state != OBJECT_NONEXISTENT;
1847 }
1848
1849 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1850                                 struct ceph_object_id *oid)
1851 {
1852         if (snap_id == CEPH_NOSNAP)
1853                 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1854                                 rbd_dev->spec->image_id);
1855         else
1856                 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1857                                 rbd_dev->spec->image_id, snap_id);
1858 }
1859
1860 static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1861 {
1862         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1863         CEPH_DEFINE_OID_ONSTACK(oid);
1864         u8 lock_type;
1865         char *lock_tag;
1866         struct ceph_locker *lockers;
1867         u32 num_lockers;
1868         bool broke_lock = false;
1869         int ret;
1870
1871         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1872
1873 again:
1874         ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1875                             CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1876         if (ret != -EBUSY || broke_lock) {
1877                 if (ret == -EEXIST)
1878                         ret = 0; /* already locked by myself */
1879                 if (ret)
1880                         rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1881                 return ret;
1882         }
1883
1884         ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1885                                  RBD_LOCK_NAME, &lock_type, &lock_tag,
1886                                  &lockers, &num_lockers);
1887         if (ret) {
1888                 if (ret == -ENOENT)
1889                         goto again;
1890
1891                 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1892                 return ret;
1893         }
1894
1895         kfree(lock_tag);
1896         if (num_lockers == 0)
1897                 goto again;
1898
1899         rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1900                  ENTITY_NAME(lockers[0].id.name));
1901
1902         ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1903                                   RBD_LOCK_NAME, lockers[0].id.cookie,
1904                                   &lockers[0].id.name);
1905         ceph_free_lockers(lockers, num_lockers);
1906         if (ret) {
1907                 if (ret == -ENOENT)
1908                         goto again;
1909
1910                 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1911                 return ret;
1912         }
1913
1914         broke_lock = true;
1915         goto again;
1916 }
1917
1918 static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1919 {
1920         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1921         CEPH_DEFINE_OID_ONSTACK(oid);
1922         int ret;
1923
1924         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1925
1926         ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1927                               "");
1928         if (ret && ret != -ENOENT)
1929                 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1930 }
1931
1932 static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1933 {
1934         u8 struct_v;
1935         u32 struct_len;
1936         u32 header_len;
1937         void *header_end;
1938         int ret;
1939
1940         ceph_decode_32_safe(p, end, header_len, e_inval);
1941         header_end = *p + header_len;
1942
1943         ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1944                                   &struct_len);
1945         if (ret)
1946                 return ret;
1947
1948         ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1949
1950         *p = header_end;
1951         return 0;
1952
1953 e_inval:
1954         return -EINVAL;
1955 }
1956
1957 static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1958 {
1959         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1960         CEPH_DEFINE_OID_ONSTACK(oid);
1961         struct page **pages;
1962         void *p, *end;
1963         size_t reply_len;
1964         u64 num_objects;
1965         u64 object_map_bytes;
1966         u64 object_map_size;
1967         int num_pages;
1968         int ret;
1969
1970         rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1971
1972         num_objects = ceph_get_num_objects(&rbd_dev->layout,
1973                                            rbd_dev->mapping.size);
1974         object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1975                                             BITS_PER_BYTE);
1976         num_pages = calc_pages_for(0, object_map_bytes) + 1;
1977         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1978         if (IS_ERR(pages))
1979                 return PTR_ERR(pages);
1980
1981         reply_len = num_pages * PAGE_SIZE;
1982         rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1983         ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1984                              "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1985                              NULL, 0, pages, &reply_len);
1986         if (ret)
1987                 goto out;
1988
1989         p = page_address(pages[0]);
1990         end = p + min(reply_len, (size_t)PAGE_SIZE);
1991         ret = decode_object_map_header(&p, end, &object_map_size);
1992         if (ret)
1993                 goto out;
1994
1995         if (object_map_size != num_objects) {
1996                 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1997                          object_map_size, num_objects);
1998                 ret = -EINVAL;
1999                 goto out;
2000         }
2001
2002         if (offset_in_page(p) + object_map_bytes > reply_len) {
2003                 ret = -EINVAL;
2004                 goto out;
2005         }
2006
2007         rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
2008         if (!rbd_dev->object_map) {
2009                 ret = -ENOMEM;
2010                 goto out;
2011         }
2012
2013         rbd_dev->object_map_size = object_map_size;
2014         ceph_copy_from_page_vector(pages, rbd_dev->object_map,
2015                                    offset_in_page(p), object_map_bytes);
2016
2017 out:
2018         ceph_release_page_vector(pages, num_pages);
2019         return ret;
2020 }
2021
2022 static void rbd_object_map_free(struct rbd_device *rbd_dev)
2023 {
2024         kvfree(rbd_dev->object_map);
2025         rbd_dev->object_map = NULL;
2026         rbd_dev->object_map_size = 0;
2027 }
2028
2029 static int rbd_object_map_load(struct rbd_device *rbd_dev)
2030 {
2031         int ret;
2032
2033         ret = __rbd_object_map_load(rbd_dev);
2034         if (ret)
2035                 return ret;
2036
2037         ret = rbd_dev_v2_get_flags(rbd_dev);
2038         if (ret) {
2039                 rbd_object_map_free(rbd_dev);
2040                 return ret;
2041         }
2042
2043         if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
2044                 rbd_warn(rbd_dev, "object map is invalid");
2045
2046         return 0;
2047 }
2048
2049 static int rbd_object_map_open(struct rbd_device *rbd_dev)
2050 {
2051         int ret;
2052
2053         ret = rbd_object_map_lock(rbd_dev);
2054         if (ret)
2055                 return ret;
2056
2057         ret = rbd_object_map_load(rbd_dev);
2058         if (ret) {
2059                 rbd_object_map_unlock(rbd_dev);
2060                 return ret;
2061         }
2062
2063         return 0;
2064 }
2065
2066 static void rbd_object_map_close(struct rbd_device *rbd_dev)
2067 {
2068         rbd_object_map_free(rbd_dev);
2069         rbd_object_map_unlock(rbd_dev);
2070 }
2071
2072 /*
2073  * This function needs snap_id (or more precisely just something to
2074  * distinguish between HEAD and snapshot object maps), new_state and
2075  * current_state that were passed to rbd_object_map_update().
2076  *
2077  * To avoid allocating and stashing a context we piggyback on the OSD
2078  * request.  A HEAD update has two ops (assert_locked).  For new_state
2079  * and current_state we decode our own object_map_update op, encoded in
2080  * rbd_cls_object_map_update().
2081  */
2082 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2083                                         struct ceph_osd_request *osd_req)
2084 {
2085         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2086         struct ceph_osd_data *osd_data;
2087         u64 objno;
2088         u8 state, new_state, uninitialized_var(current_state);
2089         bool has_current_state;
2090         void *p;
2091
2092         if (osd_req->r_result)
2093                 return osd_req->r_result;
2094
2095         /*
2096          * Nothing to do for a snapshot object map.
2097          */
2098         if (osd_req->r_num_ops == 1)
2099                 return 0;
2100
2101         /*
2102          * Update in-memory HEAD object map.
2103          */
2104         rbd_assert(osd_req->r_num_ops == 2);
2105         osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2106         rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2107
2108         p = page_address(osd_data->pages[0]);
2109         objno = ceph_decode_64(&p);
2110         rbd_assert(objno == obj_req->ex.oe_objno);
2111         rbd_assert(ceph_decode_64(&p) == objno + 1);
2112         new_state = ceph_decode_8(&p);
2113         has_current_state = ceph_decode_8(&p);
2114         if (has_current_state)
2115                 current_state = ceph_decode_8(&p);
2116
2117         spin_lock(&rbd_dev->object_map_lock);
2118         state = __rbd_object_map_get(rbd_dev, objno);
2119         if (!has_current_state || current_state == state ||
2120             (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2121                 __rbd_object_map_set(rbd_dev, objno, new_state);
2122         spin_unlock(&rbd_dev->object_map_lock);
2123
2124         return 0;
2125 }
2126
2127 static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2128 {
2129         struct rbd_obj_request *obj_req = osd_req->r_priv;
2130         int result;
2131
2132         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2133              osd_req->r_result, obj_req);
2134
2135         result = rbd_object_map_update_finish(obj_req, osd_req);
2136         rbd_obj_handle_request(obj_req, result);
2137 }
2138
2139 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2140 {
2141         u8 state = rbd_object_map_get(rbd_dev, objno);
2142
2143         if (state == new_state ||
2144             (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2145             (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2146                 return false;
2147
2148         return true;
2149 }
2150
2151 static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2152                                      int which, u64 objno, u8 new_state,
2153                                      const u8 *current_state)
2154 {
2155         struct page **pages;
2156         void *p, *start;
2157         int ret;
2158
2159         ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2160         if (ret)
2161                 return ret;
2162
2163         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2164         if (IS_ERR(pages))
2165                 return PTR_ERR(pages);
2166
2167         p = start = page_address(pages[0]);
2168         ceph_encode_64(&p, objno);
2169         ceph_encode_64(&p, objno + 1);
2170         ceph_encode_8(&p, new_state);
2171         if (current_state) {
2172                 ceph_encode_8(&p, 1);
2173                 ceph_encode_8(&p, *current_state);
2174         } else {
2175                 ceph_encode_8(&p, 0);
2176         }
2177
2178         osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2179                                           false, true);
2180         return 0;
2181 }
2182
2183 /*
2184  * Return:
2185  *   0 - object map update sent
2186  *   1 - object map update isn't needed
2187  *  <0 - error
2188  */
2189 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2190                                  u8 new_state, const u8 *current_state)
2191 {
2192         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2193         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2194         struct ceph_osd_request *req;
2195         int num_ops = 1;
2196         int which = 0;
2197         int ret;
2198
2199         if (snap_id == CEPH_NOSNAP) {
2200                 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2201                         return 1;
2202
2203                 num_ops++; /* assert_locked */
2204         }
2205
2206         req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2207         if (!req)
2208                 return -ENOMEM;
2209
2210         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2211         req->r_callback = rbd_object_map_callback;
2212         req->r_priv = obj_req;
2213
2214         rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2215         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2216         req->r_flags = CEPH_OSD_FLAG_WRITE;
2217         ktime_get_real_ts64(&req->r_mtime);
2218
2219         if (snap_id == CEPH_NOSNAP) {
2220                 /*
2221                  * Protect against possible race conditions during lock
2222                  * ownership transitions.
2223                  */
2224                 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2225                                              CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2226                 if (ret)
2227                         return ret;
2228         }
2229
2230         ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2231                                         new_state, current_state);
2232         if (ret)
2233                 return ret;
2234
2235         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2236         if (ret)
2237                 return ret;
2238
2239         ceph_osdc_start_request(osdc, req, false);
2240         return 0;
2241 }
2242
2243 static void prune_extents(struct ceph_file_extent *img_extents,
2244                           u32 *num_img_extents, u64 overlap)
2245 {
2246         u32 cnt = *num_img_extents;
2247
2248         /* drop extents completely beyond the overlap */
2249         while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2250                 cnt--;
2251
2252         if (cnt) {
2253                 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2254
2255                 /* trim final overlapping extent */
2256                 if (ex->fe_off + ex->fe_len > overlap)
2257                         ex->fe_len = overlap - ex->fe_off;
2258         }
2259
2260         *num_img_extents = cnt;
2261 }
2262
2263 /*
2264  * Determine the byte range(s) covered by either just the object extent
2265  * or the entire object in the parent image.
2266  */
2267 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2268                                     bool entire)
2269 {
2270         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2271         int ret;
2272
2273         if (!rbd_dev->parent_overlap)
2274                 return 0;
2275
2276         ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2277                                   entire ? 0 : obj_req->ex.oe_off,
2278                                   entire ? rbd_dev->layout.object_size :
2279                                                         obj_req->ex.oe_len,
2280                                   &obj_req->img_extents,
2281                                   &obj_req->num_img_extents);
2282         if (ret)
2283                 return ret;
2284
2285         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2286                       rbd_dev->parent_overlap);
2287         return 0;
2288 }
2289
2290 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2291 {
2292         struct rbd_obj_request *obj_req = osd_req->r_priv;
2293
2294         switch (obj_req->img_request->data_type) {
2295         case OBJ_REQUEST_BIO:
2296                 osd_req_op_extent_osd_data_bio(osd_req, which,
2297                                                &obj_req->bio_pos,
2298                                                obj_req->ex.oe_len);
2299                 break;
2300         case OBJ_REQUEST_BVECS:
2301         case OBJ_REQUEST_OWN_BVECS:
2302                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2303                                                         obj_req->ex.oe_len);
2304                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2305                 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2306                                                     &obj_req->bvec_pos);
2307                 break;
2308         default:
2309                 BUG();
2310         }
2311 }
2312
2313 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2314 {
2315         struct page **pages;
2316
2317         /*
2318          * The response data for a STAT call consists of:
2319          *     le64 length;
2320          *     struct {
2321          *         le32 tv_sec;
2322          *         le32 tv_nsec;
2323          *     } mtime;
2324          */
2325         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2326         if (IS_ERR(pages))
2327                 return PTR_ERR(pages);
2328
2329         osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2330         osd_req_op_raw_data_in_pages(osd_req, which, pages,
2331                                      8 + sizeof(struct ceph_timespec),
2332                                      0, false, true);
2333         return 0;
2334 }
2335
2336 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2337                                 u32 bytes)
2338 {
2339         struct rbd_obj_request *obj_req = osd_req->r_priv;
2340         int ret;
2341
2342         ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2343         if (ret)
2344                 return ret;
2345
2346         osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2347                                           obj_req->copyup_bvec_count, bytes);
2348         return 0;
2349 }
2350
2351 static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2352 {
2353         obj_req->read_state = RBD_OBJ_READ_START;
2354         return 0;
2355 }
2356
2357 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2358                                       int which)
2359 {
2360         struct rbd_obj_request *obj_req = osd_req->r_priv;
2361         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2362         u16 opcode;
2363
2364         if (!use_object_map(rbd_dev) ||
2365             !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2366                 osd_req_op_alloc_hint_init(osd_req, which++,
2367                                            rbd_dev->layout.object_size,
2368                                            rbd_dev->layout.object_size);
2369         }
2370
2371         if (rbd_obj_is_entire(obj_req))
2372                 opcode = CEPH_OSD_OP_WRITEFULL;
2373         else
2374                 opcode = CEPH_OSD_OP_WRITE;
2375
2376         osd_req_op_extent_init(osd_req, which, opcode,
2377                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2378         rbd_osd_setup_data(osd_req, which);
2379 }
2380
2381 static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2382 {
2383         int ret;
2384
2385         /* reverse map the entire object onto the parent */
2386         ret = rbd_obj_calc_img_extents(obj_req, true);
2387         if (ret)
2388                 return ret;
2389
2390         if (rbd_obj_copyup_enabled(obj_req))
2391                 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2392
2393         obj_req->write_state = RBD_OBJ_WRITE_START;
2394         return 0;
2395 }
2396
2397 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2398 {
2399         return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2400                                           CEPH_OSD_OP_ZERO;
2401 }
2402
2403 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2404                                         int which)
2405 {
2406         struct rbd_obj_request *obj_req = osd_req->r_priv;
2407
2408         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2409                 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2410                 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2411         } else {
2412                 osd_req_op_extent_init(osd_req, which,
2413                                        truncate_or_zero_opcode(obj_req),
2414                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2415                                        0, 0);
2416         }
2417 }
2418
2419 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2420 {
2421         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2422         u64 off, next_off;
2423         int ret;
2424
2425         /*
2426          * Align the range to alloc_size boundary and punt on discards
2427          * that are too small to free up any space.
2428          *
2429          * alloc_size == object_size && is_tail() is a special case for
2430          * filestore with filestore_punch_hole = false, needed to allow
2431          * truncate (in addition to delete).
2432          */
2433         if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2434             !rbd_obj_is_tail(obj_req)) {
2435                 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2436                 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2437                                       rbd_dev->opts->alloc_size);
2438                 if (off >= next_off)
2439                         return 1;
2440
2441                 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2442                      obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2443                      off, next_off - off);
2444                 obj_req->ex.oe_off = off;
2445                 obj_req->ex.oe_len = next_off - off;
2446         }
2447
2448         /* reverse map the entire object onto the parent */
2449         ret = rbd_obj_calc_img_extents(obj_req, true);
2450         if (ret)
2451                 return ret;
2452
2453         obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2454         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2455                 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2456
2457         obj_req->write_state = RBD_OBJ_WRITE_START;
2458         return 0;
2459 }
2460
2461 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2462                                         int which)
2463 {
2464         struct rbd_obj_request *obj_req = osd_req->r_priv;
2465         u16 opcode;
2466
2467         if (rbd_obj_is_entire(obj_req)) {
2468                 if (obj_req->num_img_extents) {
2469                         if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2470                                 osd_req_op_init(osd_req, which++,
2471                                                 CEPH_OSD_OP_CREATE, 0);
2472                         opcode = CEPH_OSD_OP_TRUNCATE;
2473                 } else {
2474                         rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2475                         osd_req_op_init(osd_req, which++,
2476                                         CEPH_OSD_OP_DELETE, 0);
2477                         opcode = 0;
2478                 }
2479         } else {
2480                 opcode = truncate_or_zero_opcode(obj_req);
2481         }
2482
2483         if (opcode)
2484                 osd_req_op_extent_init(osd_req, which, opcode,
2485                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2486                                        0, 0);
2487 }
2488
2489 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2490 {
2491         int ret;
2492
2493         /* reverse map the entire object onto the parent */
2494         ret = rbd_obj_calc_img_extents(obj_req, true);
2495         if (ret)
2496                 return ret;
2497
2498         if (rbd_obj_copyup_enabled(obj_req))
2499                 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2500         if (!obj_req->num_img_extents) {
2501                 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2502                 if (rbd_obj_is_entire(obj_req))
2503                         obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2504         }
2505
2506         obj_req->write_state = RBD_OBJ_WRITE_START;
2507         return 0;
2508 }
2509
2510 static int count_write_ops(struct rbd_obj_request *obj_req)
2511 {
2512         struct rbd_img_request *img_req = obj_req->img_request;
2513
2514         switch (img_req->op_type) {
2515         case OBJ_OP_WRITE:
2516                 if (!use_object_map(img_req->rbd_dev) ||
2517                     !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2518                         return 2; /* setallochint + write/writefull */
2519
2520                 return 1; /* write/writefull */
2521         case OBJ_OP_DISCARD:
2522                 return 1; /* delete/truncate/zero */
2523         case OBJ_OP_ZEROOUT:
2524                 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2525                     !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2526                         return 2; /* create + truncate */
2527
2528                 return 1; /* delete/truncate/zero */
2529         default:
2530                 BUG();
2531         }
2532 }
2533
2534 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2535                                     int which)
2536 {
2537         struct rbd_obj_request *obj_req = osd_req->r_priv;
2538
2539         switch (obj_req->img_request->op_type) {
2540         case OBJ_OP_WRITE:
2541                 __rbd_osd_setup_write_ops(osd_req, which);
2542                 break;
2543         case OBJ_OP_DISCARD:
2544                 __rbd_osd_setup_discard_ops(osd_req, which);
2545                 break;
2546         case OBJ_OP_ZEROOUT:
2547                 __rbd_osd_setup_zeroout_ops(osd_req, which);
2548                 break;
2549         default:
2550                 BUG();
2551         }
2552 }
2553
2554 /*
2555  * Prune the list of object requests (adjust offset and/or length, drop
2556  * redundant requests).  Prepare object request state machines and image
2557  * request state machine for execution.
2558  */
2559 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2560 {
2561         struct rbd_obj_request *obj_req, *next_obj_req;
2562         int ret;
2563
2564         for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2565                 switch (img_req->op_type) {
2566                 case OBJ_OP_READ:
2567                         ret = rbd_obj_init_read(obj_req);
2568                         break;
2569                 case OBJ_OP_WRITE:
2570                         ret = rbd_obj_init_write(obj_req);
2571                         break;
2572                 case OBJ_OP_DISCARD:
2573                         ret = rbd_obj_init_discard(obj_req);
2574                         break;
2575                 case OBJ_OP_ZEROOUT:
2576                         ret = rbd_obj_init_zeroout(obj_req);
2577                         break;
2578                 default:
2579                         BUG();
2580                 }
2581                 if (ret < 0)
2582                         return ret;
2583                 if (ret > 0) {
2584                         rbd_img_obj_request_del(img_req, obj_req);
2585                         continue;
2586                 }
2587         }
2588
2589         img_req->state = RBD_IMG_START;
2590         return 0;
2591 }
2592
2593 union rbd_img_fill_iter {
2594         struct ceph_bio_iter    bio_iter;
2595         struct ceph_bvec_iter   bvec_iter;
2596 };
2597
2598 struct rbd_img_fill_ctx {
2599         enum obj_request_type   pos_type;
2600         union rbd_img_fill_iter *pos;
2601         union rbd_img_fill_iter iter;
2602         ceph_object_extent_fn_t set_pos_fn;
2603         ceph_object_extent_fn_t count_fn;
2604         ceph_object_extent_fn_t copy_fn;
2605 };
2606
2607 static struct ceph_object_extent *alloc_object_extent(void *arg)
2608 {
2609         struct rbd_img_request *img_req = arg;
2610         struct rbd_obj_request *obj_req;
2611
2612         obj_req = rbd_obj_request_create();
2613         if (!obj_req)
2614                 return NULL;
2615
2616         rbd_img_obj_request_add(img_req, obj_req);
2617         return &obj_req->ex;
2618 }
2619
2620 /*
2621  * While su != os && sc == 1 is technically not fancy (it's the same
2622  * layout as su == os && sc == 1), we can't use the nocopy path for it
2623  * because ->set_pos_fn() should be called only once per object.
2624  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2625  * treat su != os && sc == 1 as fancy.
2626  */
2627 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2628 {
2629         return l->stripe_unit != l->object_size;
2630 }
2631
2632 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2633                                        struct ceph_file_extent *img_extents,
2634                                        u32 num_img_extents,
2635                                        struct rbd_img_fill_ctx *fctx)
2636 {
2637         u32 i;
2638         int ret;
2639
2640         img_req->data_type = fctx->pos_type;
2641
2642         /*
2643          * Create object requests and set each object request's starting
2644          * position in the provided bio (list) or bio_vec array.
2645          */
2646         fctx->iter = *fctx->pos;
2647         for (i = 0; i < num_img_extents; i++) {
2648                 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2649                                            img_extents[i].fe_off,
2650                                            img_extents[i].fe_len,
2651                                            &img_req->object_extents,
2652                                            alloc_object_extent, img_req,
2653                                            fctx->set_pos_fn, &fctx->iter);
2654                 if (ret)
2655                         return ret;
2656         }
2657
2658         return __rbd_img_fill_request(img_req);
2659 }
2660
2661 /*
2662  * Map a list of image extents to a list of object extents, create the
2663  * corresponding object requests (normally each to a different object,
2664  * but not always) and add them to @img_req.  For each object request,
2665  * set up its data descriptor to point to the corresponding chunk(s) of
2666  * @fctx->pos data buffer.
2667  *
2668  * Because ceph_file_to_extents() will merge adjacent object extents
2669  * together, each object request's data descriptor may point to multiple
2670  * different chunks of @fctx->pos data buffer.
2671  *
2672  * @fctx->pos data buffer is assumed to be large enough.
2673  */
2674 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2675                                 struct ceph_file_extent *img_extents,
2676                                 u32 num_img_extents,
2677                                 struct rbd_img_fill_ctx *fctx)
2678 {
2679         struct rbd_device *rbd_dev = img_req->rbd_dev;
2680         struct rbd_obj_request *obj_req;
2681         u32 i;
2682         int ret;
2683
2684         if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2685             !rbd_layout_is_fancy(&rbd_dev->layout))
2686                 return rbd_img_fill_request_nocopy(img_req, img_extents,
2687                                                    num_img_extents, fctx);
2688
2689         img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2690
2691         /*
2692          * Create object requests and determine ->bvec_count for each object
2693          * request.  Note that ->bvec_count sum over all object requests may
2694          * be greater than the number of bio_vecs in the provided bio (list)
2695          * or bio_vec array because when mapped, those bio_vecs can straddle
2696          * stripe unit boundaries.
2697          */
2698         fctx->iter = *fctx->pos;
2699         for (i = 0; i < num_img_extents; i++) {
2700                 ret = ceph_file_to_extents(&rbd_dev->layout,
2701                                            img_extents[i].fe_off,
2702                                            img_extents[i].fe_len,
2703                                            &img_req->object_extents,
2704                                            alloc_object_extent, img_req,
2705                                            fctx->count_fn, &fctx->iter);
2706                 if (ret)
2707                         return ret;
2708         }
2709
2710         for_each_obj_request(img_req, obj_req) {
2711                 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2712                                               sizeof(*obj_req->bvec_pos.bvecs),
2713                                               GFP_NOIO);
2714                 if (!obj_req->bvec_pos.bvecs)
2715                         return -ENOMEM;
2716         }
2717
2718         /*
2719          * Fill in each object request's private bio_vec array, splitting and
2720          * rearranging the provided bio_vecs in stripe unit chunks as needed.
2721          */
2722         fctx->iter = *fctx->pos;
2723         for (i = 0; i < num_img_extents; i++) {
2724                 ret = ceph_iterate_extents(&rbd_dev->layout,
2725                                            img_extents[i].fe_off,
2726                                            img_extents[i].fe_len,
2727                                            &img_req->object_extents,
2728                                            fctx->copy_fn, &fctx->iter);
2729                 if (ret)
2730                         return ret;
2731         }
2732
2733         return __rbd_img_fill_request(img_req);
2734 }
2735
2736 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2737                                u64 off, u64 len)
2738 {
2739         struct ceph_file_extent ex = { off, len };
2740         union rbd_img_fill_iter dummy;
2741         struct rbd_img_fill_ctx fctx = {
2742                 .pos_type = OBJ_REQUEST_NODATA,
2743                 .pos = &dummy,
2744         };
2745
2746         return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2747 }
2748
2749 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2750 {
2751         struct rbd_obj_request *obj_req =
2752             container_of(ex, struct rbd_obj_request, ex);
2753         struct ceph_bio_iter *it = arg;
2754
2755         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2756         obj_req->bio_pos = *it;
2757         ceph_bio_iter_advance(it, bytes);
2758 }
2759
2760 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2761 {
2762         struct rbd_obj_request *obj_req =
2763             container_of(ex, struct rbd_obj_request, ex);
2764         struct ceph_bio_iter *it = arg;
2765
2766         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2767         ceph_bio_iter_advance_step(it, bytes, ({
2768                 obj_req->bvec_count++;
2769         }));
2770
2771 }
2772
2773 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2774 {
2775         struct rbd_obj_request *obj_req =
2776             container_of(ex, struct rbd_obj_request, ex);
2777         struct ceph_bio_iter *it = arg;
2778
2779         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2780         ceph_bio_iter_advance_step(it, bytes, ({
2781                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2782                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2783         }));
2784 }
2785
2786 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2787                                    struct ceph_file_extent *img_extents,
2788                                    u32 num_img_extents,
2789                                    struct ceph_bio_iter *bio_pos)
2790 {
2791         struct rbd_img_fill_ctx fctx = {
2792                 .pos_type = OBJ_REQUEST_BIO,
2793                 .pos = (union rbd_img_fill_iter *)bio_pos,
2794                 .set_pos_fn = set_bio_pos,
2795                 .count_fn = count_bio_bvecs,
2796                 .copy_fn = copy_bio_bvecs,
2797         };
2798
2799         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2800                                     &fctx);
2801 }
2802
2803 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2804                                  u64 off, u64 len, struct bio *bio)
2805 {
2806         struct ceph_file_extent ex = { off, len };
2807         struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2808
2809         return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2810 }
2811
2812 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2813 {
2814         struct rbd_obj_request *obj_req =
2815             container_of(ex, struct rbd_obj_request, ex);
2816         struct ceph_bvec_iter *it = arg;
2817
2818         obj_req->bvec_pos = *it;
2819         ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2820         ceph_bvec_iter_advance(it, bytes);
2821 }
2822
2823 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2824 {
2825         struct rbd_obj_request *obj_req =
2826             container_of(ex, struct rbd_obj_request, ex);
2827         struct ceph_bvec_iter *it = arg;
2828
2829         ceph_bvec_iter_advance_step(it, bytes, ({
2830                 obj_req->bvec_count++;
2831         }));
2832 }
2833
2834 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2835 {
2836         struct rbd_obj_request *obj_req =
2837             container_of(ex, struct rbd_obj_request, ex);
2838         struct ceph_bvec_iter *it = arg;
2839
2840         ceph_bvec_iter_advance_step(it, bytes, ({
2841                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2842                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2843         }));
2844 }
2845
2846 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2847                                      struct ceph_file_extent *img_extents,
2848                                      u32 num_img_extents,
2849                                      struct ceph_bvec_iter *bvec_pos)
2850 {
2851         struct rbd_img_fill_ctx fctx = {
2852                 .pos_type = OBJ_REQUEST_BVECS,
2853                 .pos = (union rbd_img_fill_iter *)bvec_pos,
2854                 .set_pos_fn = set_bvec_pos,
2855                 .count_fn = count_bvecs,
2856                 .copy_fn = copy_bvecs,
2857         };
2858
2859         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2860                                     &fctx);
2861 }
2862
2863 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2864                                    struct ceph_file_extent *img_extents,
2865                                    u32 num_img_extents,
2866                                    struct bio_vec *bvecs)
2867 {
2868         struct ceph_bvec_iter it = {
2869                 .bvecs = bvecs,
2870                 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2871                                                              num_img_extents) },
2872         };
2873
2874         return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2875                                          &it);
2876 }
2877
2878 static void rbd_img_handle_request_work(struct work_struct *work)
2879 {
2880         struct rbd_img_request *img_req =
2881             container_of(work, struct rbd_img_request, work);
2882
2883         rbd_img_handle_request(img_req, img_req->work_result);
2884 }
2885
2886 static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2887 {
2888         INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2889         img_req->work_result = result;
2890         queue_work(rbd_wq, &img_req->work);
2891 }
2892
2893 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2894 {
2895         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2896
2897         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2898                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2899                 return true;
2900         }
2901
2902         dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2903              obj_req->ex.oe_objno);
2904         return false;
2905 }
2906
2907 static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2908 {
2909         struct ceph_osd_request *osd_req;
2910         int ret;
2911
2912         osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2913         if (IS_ERR(osd_req))
2914                 return PTR_ERR(osd_req);
2915
2916         osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2917                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2918         rbd_osd_setup_data(osd_req, 0);
2919         rbd_osd_format_read(osd_req);
2920
2921         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2922         if (ret)
2923                 return ret;
2924
2925         rbd_osd_submit(osd_req);
2926         return 0;
2927 }
2928
2929 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2930 {
2931         struct rbd_img_request *img_req = obj_req->img_request;
2932         struct rbd_img_request *child_img_req;
2933         int ret;
2934
2935         child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2936                                                OBJ_OP_READ, NULL);
2937         if (!child_img_req)
2938                 return -ENOMEM;
2939
2940         __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2941         child_img_req->obj_request = obj_req;
2942
2943         dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2944              obj_req);
2945
2946         if (!rbd_img_is_write(img_req)) {
2947                 switch (img_req->data_type) {
2948                 case OBJ_REQUEST_BIO:
2949                         ret = __rbd_img_fill_from_bio(child_img_req,
2950                                                       obj_req->img_extents,
2951                                                       obj_req->num_img_extents,
2952                                                       &obj_req->bio_pos);
2953                         break;
2954                 case OBJ_REQUEST_BVECS:
2955                 case OBJ_REQUEST_OWN_BVECS:
2956                         ret = __rbd_img_fill_from_bvecs(child_img_req,
2957                                                       obj_req->img_extents,
2958                                                       obj_req->num_img_extents,
2959                                                       &obj_req->bvec_pos);
2960                         break;
2961                 default:
2962                         BUG();
2963                 }
2964         } else {
2965                 ret = rbd_img_fill_from_bvecs(child_img_req,
2966                                               obj_req->img_extents,
2967                                               obj_req->num_img_extents,
2968                                               obj_req->copyup_bvecs);
2969         }
2970         if (ret) {
2971                 rbd_img_request_put(child_img_req);
2972                 return ret;
2973         }
2974
2975         /* avoid parent chain recursion */
2976         rbd_img_schedule(child_img_req, 0);
2977         return 0;
2978 }
2979
2980 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2981 {
2982         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2983         int ret;
2984
2985 again:
2986         switch (obj_req->read_state) {
2987         case RBD_OBJ_READ_START:
2988                 rbd_assert(!*result);
2989
2990                 if (!rbd_obj_may_exist(obj_req)) {
2991                         *result = -ENOENT;
2992                         obj_req->read_state = RBD_OBJ_READ_OBJECT;
2993                         goto again;
2994                 }
2995
2996                 ret = rbd_obj_read_object(obj_req);
2997                 if (ret) {
2998                         *result = ret;
2999                         return true;
3000                 }
3001                 obj_req->read_state = RBD_OBJ_READ_OBJECT;
3002                 return false;
3003         case RBD_OBJ_READ_OBJECT:
3004                 if (*result == -ENOENT && rbd_dev->parent_overlap) {
3005                         /* reverse map this object extent onto the parent */
3006                         ret = rbd_obj_calc_img_extents(obj_req, false);
3007                         if (ret) {
3008                                 *result = ret;
3009                                 return true;
3010                         }
3011                         if (obj_req->num_img_extents) {
3012                                 ret = rbd_obj_read_from_parent(obj_req);
3013                                 if (ret) {
3014                                         *result = ret;
3015                                         return true;
3016                                 }
3017                                 obj_req->read_state = RBD_OBJ_READ_PARENT;
3018                                 return false;
3019                         }
3020                 }
3021
3022                 /*
3023                  * -ENOENT means a hole in the image -- zero-fill the entire
3024                  * length of the request.  A short read also implies zero-fill
3025                  * to the end of the request.
3026                  */
3027                 if (*result == -ENOENT) {
3028                         rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
3029                         *result = 0;
3030                 } else if (*result >= 0) {
3031                         if (*result < obj_req->ex.oe_len)
3032                                 rbd_obj_zero_range(obj_req, *result,
3033                                                 obj_req->ex.oe_len - *result);
3034                         else
3035                                 rbd_assert(*result == obj_req->ex.oe_len);
3036                         *result = 0;
3037                 }
3038                 return true;
3039         case RBD_OBJ_READ_PARENT:
3040                 /*
3041                  * The parent image is read only up to the overlap -- zero-fill
3042                  * from the overlap to the end of the request.
3043                  */
3044                 if (!*result) {
3045                         u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
3046
3047                         if (obj_overlap < obj_req->ex.oe_len)
3048                                 rbd_obj_zero_range(obj_req, obj_overlap,
3049                                             obj_req->ex.oe_len - obj_overlap);
3050                 }
3051                 return true;
3052         default:
3053                 BUG();
3054         }
3055 }
3056
3057 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
3058 {
3059         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3060
3061         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
3062                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
3063
3064         if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
3065             (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
3066                 dout("%s %p noop for nonexistent\n", __func__, obj_req);
3067                 return true;
3068         }
3069
3070         return false;
3071 }
3072
3073 /*
3074  * Return:
3075  *   0 - object map update sent
3076  *   1 - object map update isn't needed
3077  *  <0 - error
3078  */
3079 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3080 {
3081         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3082         u8 new_state;
3083
3084         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3085                 return 1;
3086
3087         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3088                 new_state = OBJECT_PENDING;
3089         else
3090                 new_state = OBJECT_EXISTS;
3091
3092         return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3093 }
3094
3095 static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3096 {
3097         struct ceph_osd_request *osd_req;
3098         int num_ops = count_write_ops(obj_req);
3099         int which = 0;
3100         int ret;
3101
3102         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3103                 num_ops++; /* stat */
3104
3105         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3106         if (IS_ERR(osd_req))
3107                 return PTR_ERR(osd_req);
3108
3109         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3110                 ret = rbd_osd_setup_stat(osd_req, which++);
3111                 if (ret)
3112                         return ret;
3113         }
3114
3115         rbd_osd_setup_write_ops(osd_req, which);
3116         rbd_osd_format_write(osd_req);
3117
3118         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3119         if (ret)
3120                 return ret;
3121
3122         rbd_osd_submit(osd_req);
3123         return 0;
3124 }
3125
3126 /*
3127  * copyup_bvecs pages are never highmem pages
3128  */
3129 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3130 {
3131         struct ceph_bvec_iter it = {
3132                 .bvecs = bvecs,
3133                 .iter = { .bi_size = bytes },
3134         };
3135
3136         ceph_bvec_iter_advance_step(&it, bytes, ({
3137                 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3138                                bv.bv_len))
3139                         return false;
3140         }));
3141         return true;
3142 }
3143
3144 #define MODS_ONLY       U32_MAX
3145
3146 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3147                                       u32 bytes)
3148 {
3149         struct ceph_osd_request *osd_req;
3150         int ret;
3151
3152         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3153         rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3154
3155         osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3156         if (IS_ERR(osd_req))
3157                 return PTR_ERR(osd_req);
3158
3159         ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3160         if (ret)
3161                 return ret;
3162
3163         rbd_osd_format_write(osd_req);
3164
3165         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3166         if (ret)
3167                 return ret;
3168
3169         rbd_osd_submit(osd_req);
3170         return 0;
3171 }
3172
3173 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3174                                         u32 bytes)
3175 {
3176         struct ceph_osd_request *osd_req;
3177         int num_ops = count_write_ops(obj_req);
3178         int which = 0;
3179         int ret;
3180
3181         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3182
3183         if (bytes != MODS_ONLY)
3184                 num_ops++; /* copyup */
3185
3186         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3187         if (IS_ERR(osd_req))
3188                 return PTR_ERR(osd_req);
3189
3190         if (bytes != MODS_ONLY) {
3191                 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3192                 if (ret)
3193                         return ret;
3194         }
3195
3196         rbd_osd_setup_write_ops(osd_req, which);
3197         rbd_osd_format_write(osd_req);
3198
3199         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3200         if (ret)
3201                 return ret;
3202
3203         rbd_osd_submit(osd_req);
3204         return 0;
3205 }
3206
3207 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3208 {
3209         u32 i;
3210
3211         rbd_assert(!obj_req->copyup_bvecs);
3212         obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3213         obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3214                                         sizeof(*obj_req->copyup_bvecs),
3215                                         GFP_NOIO);
3216         if (!obj_req->copyup_bvecs)
3217                 return -ENOMEM;
3218
3219         for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3220                 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3221
3222                 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3223                 if (!obj_req->copyup_bvecs[i].bv_page)
3224                         return -ENOMEM;
3225
3226                 obj_req->copyup_bvecs[i].bv_offset = 0;
3227                 obj_req->copyup_bvecs[i].bv_len = len;
3228                 obj_overlap -= len;
3229         }
3230
3231         rbd_assert(!obj_overlap);
3232         return 0;
3233 }
3234
3235 /*
3236  * The target object doesn't exist.  Read the data for the entire
3237  * target object up to the overlap point (if any) from the parent,
3238  * so we can use it for a copyup.
3239  */
3240 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3241 {
3242         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3243         int ret;
3244
3245         rbd_assert(obj_req->num_img_extents);
3246         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3247                       rbd_dev->parent_overlap);
3248         if (!obj_req->num_img_extents) {
3249                 /*
3250                  * The overlap has become 0 (most likely because the
3251                  * image has been flattened).  Re-submit the original write
3252                  * request -- pass MODS_ONLY since the copyup isn't needed
3253                  * anymore.
3254                  */
3255                 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3256         }
3257
3258         ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3259         if (ret)
3260                 return ret;
3261
3262         return rbd_obj_read_from_parent(obj_req);
3263 }
3264
3265 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3266 {
3267         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3268         struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3269         u8 new_state;
3270         u32 i;
3271         int ret;
3272
3273         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3274
3275         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3276                 return;
3277
3278         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3279                 return;
3280
3281         for (i = 0; i < snapc->num_snaps; i++) {
3282                 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3283                     i + 1 < snapc->num_snaps)
3284                         new_state = OBJECT_EXISTS_CLEAN;
3285                 else
3286                         new_state = OBJECT_EXISTS;
3287
3288                 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3289                                             new_state, NULL);
3290                 if (ret < 0) {
3291                         obj_req->pending.result = ret;
3292                         return;
3293                 }
3294
3295                 rbd_assert(!ret);
3296                 obj_req->pending.num_pending++;
3297         }
3298 }
3299
3300 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3301 {
3302         u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3303         int ret;
3304
3305         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3306
3307         /*
3308          * Only send non-zero copyup data to save some I/O and network
3309          * bandwidth -- zero copyup data is equivalent to the object not
3310          * existing.
3311          */
3312         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3313                 bytes = 0;
3314
3315         if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3316                 /*
3317                  * Send a copyup request with an empty snapshot context to
3318                  * deep-copyup the object through all existing snapshots.
3319                  * A second request with the current snapshot context will be
3320                  * sent for the actual modification.
3321                  */
3322                 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3323                 if (ret) {
3324                         obj_req->pending.result = ret;
3325                         return;
3326                 }
3327
3328                 obj_req->pending.num_pending++;
3329                 bytes = MODS_ONLY;
3330         }
3331
3332         ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3333         if (ret) {
3334                 obj_req->pending.result = ret;
3335                 return;
3336         }
3337
3338         obj_req->pending.num_pending++;
3339 }
3340
3341 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3342 {
3343         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3344         int ret;
3345
3346 again:
3347         switch (obj_req->copyup_state) {
3348         case RBD_OBJ_COPYUP_START:
3349                 rbd_assert(!*result);
3350
3351                 ret = rbd_obj_copyup_read_parent(obj_req);
3352                 if (ret) {
3353                         *result = ret;
3354                         return true;
3355                 }
3356                 if (obj_req->num_img_extents)
3357                         obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3358                 else
3359                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3360                 return false;
3361         case RBD_OBJ_COPYUP_READ_PARENT:
3362                 if (*result)
3363                         return true;
3364
3365                 if (is_zero_bvecs(obj_req->copyup_bvecs,
3366                                   rbd_obj_img_extents_bytes(obj_req))) {
3367                         dout("%s %p detected zeros\n", __func__, obj_req);
3368                         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3369                 }
3370
3371                 rbd_obj_copyup_object_maps(obj_req);
3372                 if (!obj_req->pending.num_pending) {
3373                         *result = obj_req->pending.result;
3374                         obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3375                         goto again;
3376                 }
3377                 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3378                 return false;
3379         case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3380                 if (!pending_result_dec(&obj_req->pending, result))
3381                         return false;
3382                 /* fall through */
3383         case RBD_OBJ_COPYUP_OBJECT_MAPS:
3384                 if (*result) {
3385                         rbd_warn(rbd_dev, "snap object map update failed: %d",
3386                                  *result);
3387                         return true;
3388                 }
3389
3390                 rbd_obj_copyup_write_object(obj_req);
3391                 if (!obj_req->pending.num_pending) {
3392                         *result = obj_req->pending.result;
3393                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3394                         goto again;
3395                 }
3396                 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3397                 return false;
3398         case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3399                 if (!pending_result_dec(&obj_req->pending, result))
3400                         return false;
3401                 /* fall through */
3402         case RBD_OBJ_COPYUP_WRITE_OBJECT:
3403                 return true;
3404         default:
3405                 BUG();
3406         }
3407 }
3408
3409 /*
3410  * Return:
3411  *   0 - object map update sent
3412  *   1 - object map update isn't needed
3413  *  <0 - error
3414  */
3415 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3416 {
3417         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3418         u8 current_state = OBJECT_PENDING;
3419
3420         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3421                 return 1;
3422
3423         if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3424                 return 1;
3425
3426         return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3427                                      &current_state);
3428 }
3429
3430 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3431 {
3432         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3433         int ret;
3434
3435 again:
3436         switch (obj_req->write_state) {
3437         case RBD_OBJ_WRITE_START:
3438                 rbd_assert(!*result);
3439
3440                 if (rbd_obj_write_is_noop(obj_req))
3441                         return true;
3442
3443                 ret = rbd_obj_write_pre_object_map(obj_req);
3444                 if (ret < 0) {
3445                         *result = ret;
3446                         return true;
3447                 }
3448                 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3449                 if (ret > 0)
3450                         goto again;
3451                 return false;
3452         case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3453                 if (*result) {
3454                         rbd_warn(rbd_dev, "pre object map update failed: %d",
3455                                  *result);
3456                         return true;
3457                 }
3458                 ret = rbd_obj_write_object(obj_req);
3459                 if (ret) {
3460                         *result = ret;
3461                         return true;
3462                 }
3463                 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3464                 return false;
3465         case RBD_OBJ_WRITE_OBJECT:
3466                 if (*result == -ENOENT) {
3467                         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3468                                 *result = 0;
3469                                 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3470                                 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3471                                 goto again;
3472                         }
3473                         /*
3474                          * On a non-existent object:
3475                          *   delete - -ENOENT, truncate/zero - 0
3476                          */
3477                         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3478                                 *result = 0;
3479                 }
3480                 if (*result)
3481                         return true;
3482
3483                 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3484                 goto again;
3485         case __RBD_OBJ_WRITE_COPYUP:
3486                 if (!rbd_obj_advance_copyup(obj_req, result))
3487                         return false;
3488                 /* fall through */
3489         case RBD_OBJ_WRITE_COPYUP:
3490                 if (*result) {
3491                         rbd_warn(rbd_dev, "copyup failed: %d", *result);
3492                         return true;
3493                 }
3494                 ret = rbd_obj_write_post_object_map(obj_req);
3495                 if (ret < 0) {
3496                         *result = ret;
3497                         return true;
3498                 }
3499                 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3500                 if (ret > 0)
3501                         goto again;
3502                 return false;
3503         case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3504                 if (*result)
3505                         rbd_warn(rbd_dev, "post object map update failed: %d",
3506                                  *result);
3507                 return true;
3508         default:
3509                 BUG();
3510         }
3511 }
3512
3513 /*
3514  * Return true if @obj_req is completed.
3515  */
3516 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3517                                      int *result)
3518 {
3519         struct rbd_img_request *img_req = obj_req->img_request;
3520         struct rbd_device *rbd_dev = img_req->rbd_dev;
3521         bool done;
3522
3523         mutex_lock(&obj_req->state_mutex);
3524         if (!rbd_img_is_write(img_req))
3525                 done = rbd_obj_advance_read(obj_req, result);
3526         else
3527                 done = rbd_obj_advance_write(obj_req, result);
3528         mutex_unlock(&obj_req->state_mutex);
3529
3530         if (done && *result) {
3531                 rbd_assert(*result < 0);
3532                 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3533                          obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3534                          obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3535         }
3536         return done;
3537 }
3538
3539 /*
3540  * This is open-coded in rbd_img_handle_request() to avoid parent chain
3541  * recursion.
3542  */
3543 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3544 {
3545         if (__rbd_obj_handle_request(obj_req, &result))
3546                 rbd_img_handle_request(obj_req->img_request, result);
3547 }
3548
3549 static bool need_exclusive_lock(struct rbd_img_request *img_req)
3550 {
3551         struct rbd_device *rbd_dev = img_req->rbd_dev;
3552
3553         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3554                 return false;
3555
3556         if (rbd_is_ro(rbd_dev))
3557                 return false;
3558
3559         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3560         if (rbd_dev->opts->lock_on_read ||
3561             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3562                 return true;
3563
3564         return rbd_img_is_write(img_req);
3565 }
3566
3567 static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3568 {
3569         struct rbd_device *rbd_dev = img_req->rbd_dev;
3570         bool locked;
3571
3572         lockdep_assert_held(&rbd_dev->lock_rwsem);
3573         locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3574         spin_lock(&rbd_dev->lock_lists_lock);
3575         rbd_assert(list_empty(&img_req->lock_item));
3576         if (!locked)
3577                 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3578         else
3579                 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3580         spin_unlock(&rbd_dev->lock_lists_lock);
3581         return locked;
3582 }
3583
3584 static void rbd_lock_del_request(struct rbd_img_request *img_req)
3585 {
3586         struct rbd_device *rbd_dev = img_req->rbd_dev;
3587         bool need_wakeup;
3588
3589         lockdep_assert_held(&rbd_dev->lock_rwsem);
3590         spin_lock(&rbd_dev->lock_lists_lock);
3591         rbd_assert(!list_empty(&img_req->lock_item));
3592         list_del_init(&img_req->lock_item);
3593         need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3594                        list_empty(&rbd_dev->running_list));
3595         spin_unlock(&rbd_dev->lock_lists_lock);
3596         if (need_wakeup)
3597                 complete(&rbd_dev->releasing_wait);
3598 }
3599
3600 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3601 {
3602         struct rbd_device *rbd_dev = img_req->rbd_dev;
3603
3604         if (!need_exclusive_lock(img_req))
3605                 return 1;
3606
3607         if (rbd_lock_add_request(img_req))
3608                 return 1;
3609
3610         if (rbd_dev->opts->exclusive) {
3611                 WARN_ON(1); /* lock got released? */
3612                 return -EROFS;
3613         }
3614
3615         /*
3616          * Note the use of mod_delayed_work() in rbd_acquire_lock()
3617          * and cancel_delayed_work() in wake_lock_waiters().
3618          */
3619         dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3620         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3621         return 0;
3622 }
3623
3624 static void rbd_img_object_requests(struct rbd_img_request *img_req)
3625 {
3626         struct rbd_obj_request *obj_req;
3627
3628         rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3629
3630         for_each_obj_request(img_req, obj_req) {
3631                 int result = 0;
3632
3633                 if (__rbd_obj_handle_request(obj_req, &result)) {
3634                         if (result) {
3635                                 img_req->pending.result = result;
3636                                 return;
3637                         }
3638                 } else {
3639                         img_req->pending.num_pending++;
3640                 }
3641         }
3642 }
3643
3644 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3645 {
3646         struct rbd_device *rbd_dev = img_req->rbd_dev;
3647         int ret;
3648
3649 again:
3650         switch (img_req->state) {
3651         case RBD_IMG_START:
3652                 rbd_assert(!*result);
3653
3654                 ret = rbd_img_exclusive_lock(img_req);
3655                 if (ret < 0) {
3656                         *result = ret;
3657                         return true;
3658                 }
3659                 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3660                 if (ret > 0)
3661                         goto again;
3662                 return false;
3663         case RBD_IMG_EXCLUSIVE_LOCK:
3664                 if (*result)
3665                         return true;
3666
3667                 rbd_assert(!need_exclusive_lock(img_req) ||
3668                            __rbd_is_lock_owner(rbd_dev));
3669
3670                 rbd_img_object_requests(img_req);
3671                 if (!img_req->pending.num_pending) {
3672                         *result = img_req->pending.result;
3673                         img_req->state = RBD_IMG_OBJECT_REQUESTS;
3674                         goto again;
3675                 }
3676                 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3677                 return false;
3678         case __RBD_IMG_OBJECT_REQUESTS:
3679                 if (!pending_result_dec(&img_req->pending, result))
3680                         return false;
3681                 /* fall through */
3682         case RBD_IMG_OBJECT_REQUESTS:
3683                 return true;
3684         default:
3685                 BUG();
3686         }
3687 }
3688
3689 /*
3690  * Return true if @img_req is completed.
3691  */
3692 static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3693                                      int *result)
3694 {
3695         struct rbd_device *rbd_dev = img_req->rbd_dev;
3696         bool done;
3697
3698         if (need_exclusive_lock(img_req)) {
3699                 down_read(&rbd_dev->lock_rwsem);
3700                 mutex_lock(&img_req->state_mutex);
3701                 done = rbd_img_advance(img_req, result);
3702                 if (done)
3703                         rbd_lock_del_request(img_req);
3704                 mutex_unlock(&img_req->state_mutex);
3705                 up_read(&rbd_dev->lock_rwsem);
3706         } else {
3707                 mutex_lock(&img_req->state_mutex);
3708                 done = rbd_img_advance(img_req, result);
3709                 mutex_unlock(&img_req->state_mutex);
3710         }
3711
3712         if (done && *result) {
3713                 rbd_assert(*result < 0);
3714                 rbd_warn(rbd_dev, "%s%s result %d",
3715                       test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3716                       obj_op_name(img_req->op_type), *result);
3717         }
3718         return done;
3719 }
3720
3721 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3722 {
3723 again:
3724         if (!__rbd_img_handle_request(img_req, &result))
3725                 return;
3726
3727         if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3728                 struct rbd_obj_request *obj_req = img_req->obj_request;
3729
3730                 rbd_img_request_put(img_req);
3731                 if (__rbd_obj_handle_request(obj_req, &result)) {
3732                         img_req = obj_req->img_request;
3733                         goto again;
3734                 }
3735         } else {
3736                 struct request *rq = img_req->rq;
3737
3738                 rbd_img_request_put(img_req);
3739                 blk_mq_end_request(rq, errno_to_blk_status(result));
3740         }
3741 }
3742
3743 static const struct rbd_client_id rbd_empty_cid;
3744
3745 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3746                           const struct rbd_client_id *rhs)
3747 {
3748         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3749 }
3750
3751 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3752 {
3753         struct rbd_client_id cid;
3754
3755         mutex_lock(&rbd_dev->watch_mutex);
3756         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3757         cid.handle = rbd_dev->watch_cookie;
3758         mutex_unlock(&rbd_dev->watch_mutex);
3759         return cid;
3760 }
3761
3762 /*
3763  * lock_rwsem must be held for write
3764  */
3765 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3766                               const struct rbd_client_id *cid)
3767 {
3768         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3769              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3770              cid->gid, cid->handle);
3771         rbd_dev->owner_cid = *cid; /* struct */
3772 }
3773
3774 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3775 {
3776         mutex_lock(&rbd_dev->watch_mutex);
3777         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3778         mutex_unlock(&rbd_dev->watch_mutex);
3779 }
3780
3781 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3782 {
3783         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3784
3785         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3786         strcpy(rbd_dev->lock_cookie, cookie);
3787         rbd_set_owner_cid(rbd_dev, &cid);
3788         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3789 }
3790
3791 /*
3792  * lock_rwsem must be held for write
3793  */
3794 static int rbd_lock(struct rbd_device *rbd_dev)
3795 {
3796         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3797         char cookie[32];
3798         int ret;
3799
3800         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3801                 rbd_dev->lock_cookie[0] != '\0');
3802
3803         format_lock_cookie(rbd_dev, cookie);
3804         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3805                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3806                             RBD_LOCK_TAG, "", 0);
3807         if (ret)
3808                 return ret;
3809
3810         __rbd_lock(rbd_dev, cookie);
3811         return 0;
3812 }
3813
3814 /*
3815  * lock_rwsem must be held for write
3816  */
3817 static void rbd_unlock(struct rbd_device *rbd_dev)
3818 {
3819         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3820         int ret;
3821
3822         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3823                 rbd_dev->lock_cookie[0] == '\0');
3824
3825         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3826                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3827         if (ret && ret != -ENOENT)
3828                 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3829
3830         /* treat errors as the image is unlocked */
3831         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3832         rbd_dev->lock_cookie[0] = '\0';
3833         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3834         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3835 }
3836
3837 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3838                                 enum rbd_notify_op notify_op,
3839                                 struct page ***preply_pages,
3840                                 size_t *preply_len)
3841 {
3842         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3843         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3844         char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3845         int buf_size = sizeof(buf);
3846         void *p = buf;
3847
3848         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3849
3850         /* encode *LockPayload NotifyMessage (op + ClientId) */
3851         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3852         ceph_encode_32(&p, notify_op);
3853         ceph_encode_64(&p, cid.gid);
3854         ceph_encode_64(&p, cid.handle);
3855
3856         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3857                                 &rbd_dev->header_oloc, buf, buf_size,
3858                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3859 }
3860
3861 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3862                                enum rbd_notify_op notify_op)
3863 {
3864         struct page **reply_pages;
3865         size_t reply_len;
3866
3867         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3868         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3869 }
3870
3871 static void rbd_notify_acquired_lock(struct work_struct *work)
3872 {
3873         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3874                                                   acquired_lock_work);
3875
3876         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3877 }
3878
3879 static void rbd_notify_released_lock(struct work_struct *work)
3880 {
3881         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3882                                                   released_lock_work);
3883
3884         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3885 }
3886
3887 static int rbd_request_lock(struct rbd_device *rbd_dev)
3888 {
3889         struct page **reply_pages;
3890         size_t reply_len;
3891         bool lock_owner_responded = false;
3892         int ret;
3893
3894         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3895
3896         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3897                                    &reply_pages, &reply_len);
3898         if (ret && ret != -ETIMEDOUT) {
3899                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3900                 goto out;
3901         }
3902
3903         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3904                 void *p = page_address(reply_pages[0]);
3905                 void *const end = p + reply_len;
3906                 u32 n;
3907
3908                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3909                 while (n--) {
3910                         u8 struct_v;
3911                         u32 len;
3912
3913                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3914                         p += 8 + 8; /* skip gid and cookie */
3915
3916                         ceph_decode_32_safe(&p, end, len, e_inval);
3917                         if (!len)
3918                                 continue;
3919
3920                         if (lock_owner_responded) {
3921                                 rbd_warn(rbd_dev,
3922                                          "duplicate lock owners detected");
3923                                 ret = -EIO;
3924                                 goto out;
3925                         }
3926
3927                         lock_owner_responded = true;
3928                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3929                                                   &struct_v, &len);
3930                         if (ret) {
3931                                 rbd_warn(rbd_dev,
3932                                          "failed to decode ResponseMessage: %d",
3933                                          ret);
3934                                 goto e_inval;
3935                         }
3936
3937                         ret = ceph_decode_32(&p);
3938                 }
3939         }
3940
3941         if (!lock_owner_responded) {
3942                 rbd_warn(rbd_dev, "no lock owners detected");
3943                 ret = -ETIMEDOUT;
3944         }
3945
3946 out:
3947         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3948         return ret;
3949
3950 e_inval:
3951         ret = -EINVAL;
3952         goto out;
3953 }
3954
3955 /*
3956  * Either image request state machine(s) or rbd_add_acquire_lock()
3957  * (i.e. "rbd map").
3958  */
3959 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3960 {
3961         struct rbd_img_request *img_req;
3962
3963         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3964         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3965
3966         cancel_delayed_work(&rbd_dev->lock_dwork);
3967         if (!completion_done(&rbd_dev->acquire_wait)) {
3968                 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3969                            list_empty(&rbd_dev->running_list));
3970                 rbd_dev->acquire_err = result;
3971                 complete_all(&rbd_dev->acquire_wait);
3972                 return;
3973         }
3974
3975         list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3976                 mutex_lock(&img_req->state_mutex);
3977                 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3978                 rbd_img_schedule(img_req, result);
3979                 mutex_unlock(&img_req->state_mutex);
3980         }
3981
3982         list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3983 }
3984
3985 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3986                                struct ceph_locker **lockers, u32 *num_lockers)
3987 {
3988         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3989         u8 lock_type;
3990         char *lock_tag;
3991         int ret;
3992
3993         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3994
3995         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3996                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3997                                  &lock_type, &lock_tag, lockers, num_lockers);
3998         if (ret)
3999                 return ret;
4000
4001         if (*num_lockers == 0) {
4002                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
4003                 goto out;
4004         }
4005
4006         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
4007                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
4008                          lock_tag);
4009                 ret = -EBUSY;
4010                 goto out;
4011         }
4012
4013         if (lock_type == CEPH_CLS_LOCK_SHARED) {
4014                 rbd_warn(rbd_dev, "shared lock type detected");
4015                 ret = -EBUSY;
4016                 goto out;
4017         }
4018
4019         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
4020                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
4021                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
4022                          (*lockers)[0].id.cookie);
4023                 ret = -EBUSY;
4024                 goto out;
4025         }
4026
4027 out:
4028         kfree(lock_tag);
4029         return ret;
4030 }
4031
4032 static int find_watcher(struct rbd_device *rbd_dev,
4033                         const struct ceph_locker *locker)
4034 {
4035         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4036         struct ceph_watch_item *watchers;
4037         u32 num_watchers;
4038         u64 cookie;
4039         int i;
4040         int ret;
4041
4042         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
4043                                       &rbd_dev->header_oloc, &watchers,
4044                                       &num_watchers);
4045         if (ret)
4046                 return ret;
4047
4048         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
4049         for (i = 0; i < num_watchers; i++) {
4050                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
4051                             sizeof(locker->info.addr)) &&
4052                     watchers[i].cookie == cookie) {
4053                         struct rbd_client_id cid = {
4054                                 .gid = le64_to_cpu(watchers[i].name.num),
4055                                 .handle = cookie,
4056                         };
4057
4058                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
4059                              rbd_dev, cid.gid, cid.handle);
4060                         rbd_set_owner_cid(rbd_dev, &cid);
4061                         ret = 1;
4062                         goto out;
4063                 }
4064         }
4065
4066         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
4067         ret = 0;
4068 out:
4069         kfree(watchers);
4070         return ret;
4071 }
4072
4073 /*
4074  * lock_rwsem must be held for write
4075  */
4076 static int rbd_try_lock(struct rbd_device *rbd_dev)
4077 {
4078         struct ceph_client *client = rbd_dev->rbd_client->client;
4079         struct ceph_locker *lockers;
4080         u32 num_lockers;
4081         int ret;
4082
4083         for (;;) {
4084                 ret = rbd_lock(rbd_dev);
4085                 if (ret != -EBUSY)
4086                         return ret;
4087
4088                 /* determine if the current lock holder is still alive */
4089                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
4090                 if (ret)
4091                         return ret;
4092
4093                 if (num_lockers == 0)
4094                         goto again;
4095
4096                 ret = find_watcher(rbd_dev, lockers);
4097                 if (ret)
4098                         goto out; /* request lock or error */
4099
4100                 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4101                          ENTITY_NAME(lockers[0].id.name));
4102
4103                 ret = ceph_monc_blacklist_add(&client->monc,
4104                                               &lockers[0].info.addr);
4105                 if (ret) {
4106                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
4107                                  ENTITY_NAME(lockers[0].id.name), ret);
4108                         goto out;
4109                 }
4110
4111                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4112                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
4113                                           lockers[0].id.cookie,
4114                                           &lockers[0].id.name);
4115                 if (ret && ret != -ENOENT)
4116                         goto out;
4117
4118 again:
4119                 ceph_free_lockers(lockers, num_lockers);
4120         }
4121
4122 out:
4123         ceph_free_lockers(lockers, num_lockers);
4124         return ret;
4125 }
4126
4127 static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4128 {
4129         int ret;
4130
4131         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4132                 ret = rbd_object_map_open(rbd_dev);
4133                 if (ret)
4134                         return ret;
4135         }
4136
4137         return 0;
4138 }
4139
4140 /*
4141  * Return:
4142  *   0 - lock acquired
4143  *   1 - caller should call rbd_request_lock()
4144  *  <0 - error
4145  */
4146 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4147 {
4148         int ret;
4149
4150         down_read(&rbd_dev->lock_rwsem);
4151         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4152              rbd_dev->lock_state);
4153         if (__rbd_is_lock_owner(rbd_dev)) {
4154                 up_read(&rbd_dev->lock_rwsem);
4155                 return 0;
4156         }
4157
4158         up_read(&rbd_dev->lock_rwsem);
4159         down_write(&rbd_dev->lock_rwsem);
4160         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4161              rbd_dev->lock_state);
4162         if (__rbd_is_lock_owner(rbd_dev)) {
4163                 up_write(&rbd_dev->lock_rwsem);
4164                 return 0;
4165         }
4166
4167         ret = rbd_try_lock(rbd_dev);
4168         if (ret < 0) {
4169                 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4170                 if (ret == -EBLACKLISTED)
4171                         goto out;
4172
4173                 ret = 1; /* request lock anyway */
4174         }
4175         if (ret > 0) {
4176                 up_write(&rbd_dev->lock_rwsem);
4177                 return ret;
4178         }
4179
4180         rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4181         rbd_assert(list_empty(&rbd_dev->running_list));
4182
4183         ret = rbd_post_acquire_action(rbd_dev);
4184         if (ret) {
4185                 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4186                 /*
4187                  * Can't stay in RBD_LOCK_STATE_LOCKED because
4188                  * rbd_lock_add_request() would let the request through,
4189                  * assuming that e.g. object map is locked and loaded.
4190                  */
4191                 rbd_unlock(rbd_dev);
4192         }
4193
4194 out:
4195         wake_lock_waiters(rbd_dev, ret);
4196         up_write(&rbd_dev->lock_rwsem);
4197         return ret;
4198 }
4199
4200 static void rbd_acquire_lock(struct work_struct *work)
4201 {
4202         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4203                                             struct rbd_device, lock_dwork);
4204         int ret;
4205
4206         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4207 again:
4208         ret = rbd_try_acquire_lock(rbd_dev);
4209         if (ret <= 0) {
4210                 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4211                 return;
4212         }
4213
4214         ret = rbd_request_lock(rbd_dev);
4215         if (ret == -ETIMEDOUT) {
4216                 goto again; /* treat this as a dead client */
4217         } else if (ret == -EROFS) {
4218                 rbd_warn(rbd_dev, "peer will not release lock");
4219                 down_write(&rbd_dev->lock_rwsem);
4220                 wake_lock_waiters(rbd_dev, ret);
4221                 up_write(&rbd_dev->lock_rwsem);
4222         } else if (ret < 0) {
4223                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4224                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4225                                  RBD_RETRY_DELAY);
4226         } else {
4227                 /*
4228                  * lock owner acked, but resend if we don't see them
4229                  * release the lock
4230                  */
4231                 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4232                      rbd_dev);
4233                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4234                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4235         }
4236 }
4237
4238 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4239 {
4240         bool need_wait;
4241
4242         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4243         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4244
4245         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4246                 return false;
4247
4248         /*
4249          * Ensure that all in-flight IO is flushed.
4250          */
4251         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4252         rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4253         need_wait = !list_empty(&rbd_dev->running_list);
4254         downgrade_write(&rbd_dev->lock_rwsem);
4255         if (need_wait)
4256                 wait_for_completion(&rbd_dev->releasing_wait);
4257         up_read(&rbd_dev->lock_rwsem);
4258
4259         down_write(&rbd_dev->lock_rwsem);
4260         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4261                 return false;
4262
4263         rbd_assert(list_empty(&rbd_dev->running_list));
4264         return true;
4265 }
4266
4267 static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4268 {
4269         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4270                 rbd_object_map_close(rbd_dev);
4271 }
4272
4273 static void __rbd_release_lock(struct rbd_device *rbd_dev)
4274 {
4275         rbd_assert(list_empty(&rbd_dev->running_list));
4276
4277         rbd_pre_release_action(rbd_dev);
4278         rbd_unlock(rbd_dev);
4279 }
4280
4281 /*
4282  * lock_rwsem must be held for write
4283  */
4284 static void rbd_release_lock(struct rbd_device *rbd_dev)
4285 {
4286         if (!rbd_quiesce_lock(rbd_dev))
4287                 return;
4288
4289         __rbd_release_lock(rbd_dev);
4290
4291         /*
4292          * Give others a chance to grab the lock - we would re-acquire
4293          * almost immediately if we got new IO while draining the running
4294          * list otherwise.  We need to ack our own notifications, so this
4295          * lock_dwork will be requeued from rbd_handle_released_lock() by
4296          * way of maybe_kick_acquire().
4297          */
4298         cancel_delayed_work(&rbd_dev->lock_dwork);
4299 }
4300
4301 static void rbd_release_lock_work(struct work_struct *work)
4302 {
4303         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4304                                                   unlock_work);
4305
4306         down_write(&rbd_dev->lock_rwsem);
4307         rbd_release_lock(rbd_dev);
4308         up_write(&rbd_dev->lock_rwsem);
4309 }
4310
4311 static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4312 {
4313         bool have_requests;
4314
4315         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4316         if (__rbd_is_lock_owner(rbd_dev))
4317                 return;
4318
4319         spin_lock(&rbd_dev->lock_lists_lock);
4320         have_requests = !list_empty(&rbd_dev->acquiring_list);
4321         spin_unlock(&rbd_dev->lock_lists_lock);
4322         if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4323                 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4324                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4325         }
4326 }
4327
4328 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4329                                      void **p)
4330 {
4331         struct rbd_client_id cid = { 0 };
4332
4333         if (struct_v >= 2) {
4334                 cid.gid = ceph_decode_64(p);
4335                 cid.handle = ceph_decode_64(p);
4336         }
4337
4338         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4339              cid.handle);
4340         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4341                 down_write(&rbd_dev->lock_rwsem);
4342                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4343                         /*
4344                          * we already know that the remote client is
4345                          * the owner
4346                          */
4347                         up_write(&rbd_dev->lock_rwsem);
4348                         return;
4349                 }
4350
4351                 rbd_set_owner_cid(rbd_dev, &cid);
4352                 downgrade_write(&rbd_dev->lock_rwsem);
4353         } else {
4354                 down_read(&rbd_dev->lock_rwsem);
4355         }
4356
4357         maybe_kick_acquire(rbd_dev);
4358         up_read(&rbd_dev->lock_rwsem);
4359 }
4360
4361 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4362                                      void **p)
4363 {
4364         struct rbd_client_id cid = { 0 };
4365
4366         if (struct_v >= 2) {
4367                 cid.gid = ceph_decode_64(p);
4368                 cid.handle = ceph_decode_64(p);
4369         }
4370
4371         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4372              cid.handle);
4373         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4374                 down_write(&rbd_dev->lock_rwsem);
4375                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4376                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
4377                              __func__, rbd_dev, cid.gid, cid.handle,
4378                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4379                         up_write(&rbd_dev->lock_rwsem);
4380                         return;
4381                 }
4382
4383                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4384                 downgrade_write(&rbd_dev->lock_rwsem);
4385         } else {
4386                 down_read(&rbd_dev->lock_rwsem);
4387         }
4388
4389         maybe_kick_acquire(rbd_dev);
4390         up_read(&rbd_dev->lock_rwsem);
4391 }
4392
4393 /*
4394  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4395  * ResponseMessage is needed.
4396  */
4397 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4398                                    void **p)
4399 {
4400         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4401         struct rbd_client_id cid = { 0 };
4402         int result = 1;
4403
4404         if (struct_v >= 2) {
4405                 cid.gid = ceph_decode_64(p);
4406                 cid.handle = ceph_decode_64(p);
4407         }
4408
4409         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4410              cid.handle);
4411         if (rbd_cid_equal(&cid, &my_cid))
4412                 return result;
4413
4414         down_read(&rbd_dev->lock_rwsem);
4415         if (__rbd_is_lock_owner(rbd_dev)) {
4416                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4417                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4418                         goto out_unlock;
4419
4420                 /*
4421                  * encode ResponseMessage(0) so the peer can detect
4422                  * a missing owner
4423                  */
4424                 result = 0;
4425
4426                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4427                         if (!rbd_dev->opts->exclusive) {
4428                                 dout("%s rbd_dev %p queueing unlock_work\n",
4429                                      __func__, rbd_dev);
4430                                 queue_work(rbd_dev->task_wq,
4431                                            &rbd_dev->unlock_work);
4432                         } else {
4433                                 /* refuse to release the lock */
4434                                 result = -EROFS;
4435                         }
4436                 }
4437         }
4438
4439 out_unlock:
4440         up_read(&rbd_dev->lock_rwsem);
4441         return result;
4442 }
4443
4444 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4445                                      u64 notify_id, u64 cookie, s32 *result)
4446 {
4447         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4448         char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4449         int buf_size = sizeof(buf);
4450         int ret;
4451
4452         if (result) {
4453                 void *p = buf;
4454
4455                 /* encode ResponseMessage */
4456                 ceph_start_encoding(&p, 1, 1,
4457                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
4458                 ceph_encode_32(&p, *result);
4459         } else {
4460                 buf_size = 0;
4461         }
4462
4463         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4464                                    &rbd_dev->header_oloc, notify_id, cookie,
4465                                    buf, buf_size);
4466         if (ret)
4467                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4468 }
4469
4470 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4471                                    u64 cookie)
4472 {
4473         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4474         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4475 }
4476
4477 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4478                                           u64 notify_id, u64 cookie, s32 result)
4479 {
4480         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4481         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4482 }
4483
4484 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4485                          u64 notifier_id, void *data, size_t data_len)
4486 {
4487         struct rbd_device *rbd_dev = arg;
4488         void *p = data;
4489         void *const end = p + data_len;
4490         u8 struct_v = 0;
4491         u32 len;
4492         u32 notify_op;
4493         int ret;
4494
4495         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4496              __func__, rbd_dev, cookie, notify_id, data_len);
4497         if (data_len) {
4498                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4499                                           &struct_v, &len);
4500                 if (ret) {
4501                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4502                                  ret);
4503                         return;
4504                 }
4505
4506                 notify_op = ceph_decode_32(&p);
4507         } else {
4508                 /* legacy notification for header updates */
4509                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4510                 len = 0;
4511         }
4512
4513         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4514         switch (notify_op) {
4515         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4516                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4517                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4518                 break;
4519         case RBD_NOTIFY_OP_RELEASED_LOCK:
4520                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4521                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4522                 break;
4523         case RBD_NOTIFY_OP_REQUEST_LOCK:
4524                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4525                 if (ret <= 0)
4526                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4527                                                       cookie, ret);
4528                 else
4529                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4530                 break;
4531         case RBD_NOTIFY_OP_HEADER_UPDATE:
4532                 ret = rbd_dev_refresh(rbd_dev);
4533                 if (ret)
4534                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
4535
4536                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4537                 break;
4538         default:
4539                 if (rbd_is_lock_owner(rbd_dev))
4540                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4541                                                       cookie, -EOPNOTSUPP);
4542                 else
4543                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4544                 break;
4545         }
4546 }
4547
4548 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4549
4550 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4551 {
4552         struct rbd_device *rbd_dev = arg;
4553
4554         rbd_warn(rbd_dev, "encountered watch error: %d", err);
4555
4556         down_write(&rbd_dev->lock_rwsem);
4557         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4558         up_write(&rbd_dev->lock_rwsem);
4559
4560         mutex_lock(&rbd_dev->watch_mutex);
4561         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4562                 __rbd_unregister_watch(rbd_dev);
4563                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4564
4565                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4566         }
4567         mutex_unlock(&rbd_dev->watch_mutex);
4568 }
4569
4570 /*
4571  * watch_mutex must be locked
4572  */
4573 static int __rbd_register_watch(struct rbd_device *rbd_dev)
4574 {
4575         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4576         struct ceph_osd_linger_request *handle;
4577
4578         rbd_assert(!rbd_dev->watch_handle);
4579         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4580
4581         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4582                                  &rbd_dev->header_oloc, rbd_watch_cb,
4583                                  rbd_watch_errcb, rbd_dev);
4584         if (IS_ERR(handle))
4585                 return PTR_ERR(handle);
4586
4587         rbd_dev->watch_handle = handle;
4588         return 0;
4589 }
4590
4591 /*
4592  * watch_mutex must be locked
4593  */
4594 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4595 {
4596         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4597         int ret;
4598
4599         rbd_assert(rbd_dev->watch_handle);
4600         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4601
4602         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4603         if (ret)
4604                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4605
4606         rbd_dev->watch_handle = NULL;
4607 }
4608
4609 static int rbd_register_watch(struct rbd_device *rbd_dev)
4610 {
4611         int ret;
4612
4613         mutex_lock(&rbd_dev->watch_mutex);
4614         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4615         ret = __rbd_register_watch(rbd_dev);
4616         if (ret)
4617                 goto out;
4618
4619         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4620         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4621
4622 out:
4623         mutex_unlock(&rbd_dev->watch_mutex);
4624         return ret;
4625 }
4626
4627 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4628 {
4629         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4630
4631         cancel_work_sync(&rbd_dev->acquired_lock_work);
4632         cancel_work_sync(&rbd_dev->released_lock_work);
4633         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4634         cancel_work_sync(&rbd_dev->unlock_work);
4635 }
4636
4637 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4638 {
4639         cancel_tasks_sync(rbd_dev);
4640
4641         mutex_lock(&rbd_dev->watch_mutex);
4642         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4643                 __rbd_unregister_watch(rbd_dev);
4644         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4645         mutex_unlock(&rbd_dev->watch_mutex);
4646
4647         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4648         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4649 }
4650
4651 /*
4652  * lock_rwsem must be held for write
4653  */
4654 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4655 {
4656         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4657         char cookie[32];
4658         int ret;
4659
4660         if (!rbd_quiesce_lock(rbd_dev))
4661                 return;
4662
4663         format_lock_cookie(rbd_dev, cookie);
4664         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4665                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
4666                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4667                                   RBD_LOCK_TAG, cookie);
4668         if (ret) {
4669                 if (ret != -EOPNOTSUPP)
4670                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4671                                  ret);
4672
4673                 /*
4674                  * Lock cookie cannot be updated on older OSDs, so do
4675                  * a manual release and queue an acquire.
4676                  */
4677                 __rbd_release_lock(rbd_dev);
4678                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4679         } else {
4680                 __rbd_lock(rbd_dev, cookie);
4681                 wake_lock_waiters(rbd_dev, 0);
4682         }
4683 }
4684
4685 static void rbd_reregister_watch(struct work_struct *work)
4686 {
4687         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4688                                             struct rbd_device, watch_dwork);
4689         int ret;
4690
4691         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4692
4693         mutex_lock(&rbd_dev->watch_mutex);
4694         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4695                 mutex_unlock(&rbd_dev->watch_mutex);
4696                 return;
4697         }
4698
4699         ret = __rbd_register_watch(rbd_dev);
4700         if (ret) {
4701                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4702                 if (ret != -EBLACKLISTED && ret != -ENOENT) {
4703                         queue_delayed_work(rbd_dev->task_wq,
4704                                            &rbd_dev->watch_dwork,
4705                                            RBD_RETRY_DELAY);
4706                         mutex_unlock(&rbd_dev->watch_mutex);
4707                         return;
4708                 }
4709
4710                 mutex_unlock(&rbd_dev->watch_mutex);
4711                 down_write(&rbd_dev->lock_rwsem);
4712                 wake_lock_waiters(rbd_dev, ret);
4713                 up_write(&rbd_dev->lock_rwsem);
4714                 return;
4715         }
4716
4717         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4718         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4719         mutex_unlock(&rbd_dev->watch_mutex);
4720
4721         down_write(&rbd_dev->lock_rwsem);
4722         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4723                 rbd_reacquire_lock(rbd_dev);
4724         up_write(&rbd_dev->lock_rwsem);
4725
4726         ret = rbd_dev_refresh(rbd_dev);
4727         if (ret)
4728                 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4729 }
4730
4731 /*
4732  * Synchronous osd object method call.  Returns the number of bytes
4733  * returned in the outbound buffer, or a negative error code.
4734  */
4735 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4736                              struct ceph_object_id *oid,
4737                              struct ceph_object_locator *oloc,
4738                              const char *method_name,
4739                              const void *outbound,
4740                              size_t outbound_size,
4741                              void *inbound,
4742                              size_t inbound_size)
4743 {
4744         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4745         struct page *req_page = NULL;
4746         struct page *reply_page;
4747         int ret;
4748
4749         /*
4750          * Method calls are ultimately read operations.  The result
4751          * should placed into the inbound buffer provided.  They
4752          * also supply outbound data--parameters for the object
4753          * method.  Currently if this is present it will be a
4754          * snapshot id.
4755          */
4756         if (outbound) {
4757                 if (outbound_size > PAGE_SIZE)
4758                         return -E2BIG;
4759
4760                 req_page = alloc_page(GFP_KERNEL);
4761                 if (!req_page)
4762                         return -ENOMEM;
4763
4764                 memcpy(page_address(req_page), outbound, outbound_size);
4765         }
4766
4767         reply_page = alloc_page(GFP_KERNEL);
4768         if (!reply_page) {
4769                 if (req_page)
4770                         __free_page(req_page);
4771                 return -ENOMEM;
4772         }
4773
4774         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4775                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
4776                              &reply_page, &inbound_size);
4777         if (!ret) {
4778                 memcpy(inbound, page_address(reply_page), inbound_size);
4779                 ret = inbound_size;
4780         }
4781
4782         if (req_page)
4783                 __free_page(req_page);
4784         __free_page(reply_page);
4785         return ret;
4786 }
4787
4788 static void rbd_queue_workfn(struct work_struct *work)
4789 {
4790         struct request *rq = blk_mq_rq_from_pdu(work);
4791         struct rbd_device *rbd_dev = rq->q->queuedata;
4792         struct rbd_img_request *img_request;
4793         struct ceph_snap_context *snapc = NULL;
4794         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4795         u64 length = blk_rq_bytes(rq);
4796         enum obj_operation_type op_type;
4797         u64 mapping_size;
4798         int result;
4799
4800         switch (req_op(rq)) {
4801         case REQ_OP_DISCARD:
4802                 op_type = OBJ_OP_DISCARD;
4803                 break;
4804         case REQ_OP_WRITE_ZEROES:
4805                 op_type = OBJ_OP_ZEROOUT;
4806                 break;
4807         case REQ_OP_WRITE:
4808                 op_type = OBJ_OP_WRITE;
4809                 break;
4810         case REQ_OP_READ:
4811                 op_type = OBJ_OP_READ;
4812                 break;
4813         default:
4814                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4815                 result = -EIO;
4816                 goto err;
4817         }
4818
4819         /* Ignore/skip any zero-length requests */
4820
4821         if (!length) {
4822                 dout("%s: zero-length request\n", __func__);
4823                 result = 0;
4824                 goto err_rq;
4825         }
4826
4827         if (op_type != OBJ_OP_READ) {
4828                 if (rbd_is_ro(rbd_dev)) {
4829                         rbd_warn(rbd_dev, "%s on read-only mapping",
4830                                  obj_op_name(op_type));
4831                         result = -EIO;
4832                         goto err;
4833                 }
4834                 rbd_assert(!rbd_is_snap(rbd_dev));
4835         }
4836
4837         if (offset && length > U64_MAX - offset + 1) {
4838                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4839                          length);
4840                 result = -EINVAL;
4841                 goto err_rq;    /* Shouldn't happen */
4842         }
4843
4844         blk_mq_start_request(rq);
4845
4846         down_read(&rbd_dev->header_rwsem);
4847         mapping_size = rbd_dev->mapping.size;
4848         if (op_type != OBJ_OP_READ) {
4849                 snapc = rbd_dev->header.snapc;
4850                 ceph_get_snap_context(snapc);
4851         }
4852         up_read(&rbd_dev->header_rwsem);
4853
4854         if (offset + length > mapping_size) {
4855                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4856                          length, mapping_size);
4857                 result = -EIO;
4858                 goto err_rq;
4859         }
4860
4861         img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
4862         if (!img_request) {
4863                 result = -ENOMEM;
4864                 goto err_rq;
4865         }
4866         img_request->rq = rq;
4867         snapc = NULL; /* img_request consumes a ref */
4868
4869         dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4870              img_request, obj_op_name(op_type), offset, length);
4871
4872         if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4873                 result = rbd_img_fill_nodata(img_request, offset, length);
4874         else
4875                 result = rbd_img_fill_from_bio(img_request, offset, length,
4876                                                rq->bio);
4877         if (result)
4878                 goto err_img_request;
4879
4880         rbd_img_handle_request(img_request, 0);
4881         return;
4882
4883 err_img_request:
4884         rbd_img_request_put(img_request);
4885 err_rq:
4886         if (result)
4887                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4888                          obj_op_name(op_type), length, offset, result);
4889         ceph_put_snap_context(snapc);
4890 err:
4891         blk_mq_end_request(rq, errno_to_blk_status(result));
4892 }
4893
4894 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4895                 const struct blk_mq_queue_data *bd)
4896 {
4897         struct request *rq = bd->rq;
4898         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4899
4900         queue_work(rbd_wq, work);
4901         return BLK_STS_OK;
4902 }
4903
4904 static void rbd_free_disk(struct rbd_device *rbd_dev)
4905 {
4906         blk_cleanup_queue(rbd_dev->disk->queue);
4907         blk_mq_free_tag_set(&rbd_dev->tag_set);
4908         put_disk(rbd_dev->disk);
4909         rbd_dev->disk = NULL;
4910 }
4911
4912 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4913                              struct ceph_object_id *oid,
4914                              struct ceph_object_locator *oloc,
4915                              void *buf, int buf_len)
4916
4917 {
4918         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4919         struct ceph_osd_request *req;
4920         struct page **pages;
4921         int num_pages = calc_pages_for(0, buf_len);
4922         int ret;
4923
4924         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4925         if (!req)
4926                 return -ENOMEM;
4927
4928         ceph_oid_copy(&req->r_base_oid, oid);
4929         ceph_oloc_copy(&req->r_base_oloc, oloc);
4930         req->r_flags = CEPH_OSD_FLAG_READ;
4931
4932         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4933         if (IS_ERR(pages)) {
4934                 ret = PTR_ERR(pages);
4935                 goto out_req;
4936         }
4937
4938         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4939         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4940                                          true);
4941
4942         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4943         if (ret)
4944                 goto out_req;
4945
4946         ceph_osdc_start_request(osdc, req, false);
4947         ret = ceph_osdc_wait_request(osdc, req);
4948         if (ret >= 0)
4949                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4950
4951 out_req:
4952         ceph_osdc_put_request(req);
4953         return ret;
4954 }
4955
4956 /*
4957  * Read the complete header for the given rbd device.  On successful
4958  * return, the rbd_dev->header field will contain up-to-date
4959  * information about the image.
4960  */
4961 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4962 {
4963         struct rbd_image_header_ondisk *ondisk = NULL;
4964         u32 snap_count = 0;
4965         u64 names_size = 0;
4966         u32 want_count;
4967         int ret;
4968
4969         /*
4970          * The complete header will include an array of its 64-bit
4971          * snapshot ids, followed by the names of those snapshots as
4972          * a contiguous block of NUL-terminated strings.  Note that
4973          * the number of snapshots could change by the time we read
4974          * it in, in which case we re-read it.
4975          */
4976         do {
4977                 size_t size;
4978
4979                 kfree(ondisk);
4980
4981                 size = sizeof (*ondisk);
4982                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4983                 size += names_size;
4984                 ondisk = kmalloc(size, GFP_KERNEL);
4985                 if (!ondisk)
4986                         return -ENOMEM;
4987
4988                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4989                                         &rbd_dev->header_oloc, ondisk, size);
4990                 if (ret < 0)
4991                         goto out;
4992                 if ((size_t)ret < size) {
4993                         ret = -ENXIO;
4994                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4995                                 size, ret);
4996                         goto out;
4997                 }
4998                 if (!rbd_dev_ondisk_valid(ondisk)) {
4999                         ret = -ENXIO;
5000                         rbd_warn(rbd_dev, "invalid header");
5001                         goto out;
5002                 }
5003
5004                 names_size = le64_to_cpu(ondisk->snap_names_len);
5005                 want_count = snap_count;
5006                 snap_count = le32_to_cpu(ondisk->snap_count);
5007         } while (snap_count != want_count);
5008
5009         ret = rbd_header_from_disk(rbd_dev, ondisk);
5010 out:
5011         kfree(ondisk);
5012
5013         return ret;
5014 }
5015
5016 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
5017 {
5018         sector_t size;
5019
5020         /*
5021          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
5022          * try to update its size.  If REMOVING is set, updating size
5023          * is just useless work since the device can't be opened.
5024          */
5025         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
5026             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
5027                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
5028                 dout("setting size to %llu sectors", (unsigned long long)size);
5029                 set_capacity(rbd_dev->disk, size);
5030                 revalidate_disk(rbd_dev->disk);
5031         }
5032 }
5033
5034 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
5035 {
5036         u64 mapping_size;
5037         int ret;
5038
5039         down_write(&rbd_dev->header_rwsem);
5040         mapping_size = rbd_dev->mapping.size;
5041
5042         ret = rbd_dev_header_info(rbd_dev);
5043         if (ret)
5044                 goto out;
5045
5046         /*
5047          * If there is a parent, see if it has disappeared due to the
5048          * mapped image getting flattened.
5049          */
5050         if (rbd_dev->parent) {
5051                 ret = rbd_dev_v2_parent_info(rbd_dev);
5052                 if (ret)
5053                         goto out;
5054         }
5055
5056         rbd_assert(!rbd_is_snap(rbd_dev));
5057         rbd_dev->mapping.size = rbd_dev->header.image_size;
5058
5059 out:
5060         up_write(&rbd_dev->header_rwsem);
5061         if (!ret && mapping_size != rbd_dev->mapping.size)
5062                 rbd_dev_update_size(rbd_dev);
5063
5064         return ret;
5065 }
5066
5067 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
5068                 unsigned int hctx_idx, unsigned int numa_node)
5069 {
5070         struct work_struct *work = blk_mq_rq_to_pdu(rq);
5071
5072         INIT_WORK(work, rbd_queue_workfn);
5073         return 0;
5074 }
5075
5076 static const struct blk_mq_ops rbd_mq_ops = {
5077         .queue_rq       = rbd_queue_rq,
5078         .init_request   = rbd_init_request,
5079 };
5080
5081 static int rbd_init_disk(struct rbd_device *rbd_dev)
5082 {
5083         struct gendisk *disk;
5084         struct request_queue *q;
5085         unsigned int objset_bytes =
5086             rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
5087         int err;
5088
5089         /* create gendisk info */
5090         disk = alloc_disk(single_major ?
5091                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
5092                           RBD_MINORS_PER_MAJOR);
5093         if (!disk)
5094                 return -ENOMEM;
5095
5096         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
5097                  rbd_dev->dev_id);
5098         disk->major = rbd_dev->major;
5099         disk->first_minor = rbd_dev->minor;
5100         if (single_major)
5101                 disk->flags |= GENHD_FL_EXT_DEVT;
5102         disk->fops = &rbd_bd_ops;
5103         disk->private_data = rbd_dev;
5104
5105         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
5106         rbd_dev->tag_set.ops = &rbd_mq_ops;
5107         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
5108         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
5109         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
5110         rbd_dev->tag_set.nr_hw_queues = 1;
5111         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
5112
5113         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
5114         if (err)
5115                 goto out_disk;
5116
5117         q = blk_mq_init_queue(&rbd_dev->tag_set);
5118         if (IS_ERR(q)) {
5119                 err = PTR_ERR(q);
5120                 goto out_tag_set;
5121         }
5122
5123         blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5124         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
5125
5126         blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
5127         q->limits.max_sectors = queue_max_hw_sectors(q);
5128         blk_queue_max_segments(q, USHRT_MAX);
5129         blk_queue_max_segment_size(q, UINT_MAX);
5130         blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5131         blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
5132
5133         if (rbd_dev->opts->trim) {
5134                 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
5135                 q->limits.discard_granularity = rbd_dev->opts->alloc_size;
5136                 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
5137                 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
5138         }
5139
5140         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5141                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
5142
5143         /*
5144          * disk_release() expects a queue ref from add_disk() and will
5145          * put it.  Hold an extra ref until add_disk() is called.
5146          */
5147         WARN_ON(!blk_get_queue(q));
5148         disk->queue = q;
5149         q->queuedata = rbd_dev;
5150
5151         rbd_dev->disk = disk;
5152
5153         return 0;
5154 out_tag_set:
5155         blk_mq_free_tag_set(&rbd_dev->tag_set);
5156 out_disk:
5157         put_disk(disk);
5158         return err;
5159 }
5160
5161 /*
5162   sysfs
5163 */
5164
5165 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5166 {
5167         return container_of(dev, struct rbd_device, dev);
5168 }
5169
5170 static ssize_t rbd_size_show(struct device *dev,
5171                              struct device_attribute *attr, char *buf)
5172 {
5173         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5174
5175         return sprintf(buf, "%llu\n",
5176                 (unsigned long long)rbd_dev->mapping.size);
5177 }
5178
5179 static ssize_t rbd_features_show(struct device *dev,
5180                              struct device_attribute *attr, char *buf)
5181 {
5182         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5183
5184         return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
5185 }
5186
5187 static ssize_t rbd_major_show(struct device *dev,
5188                               struct device_attribute *attr, char *buf)
5189 {
5190         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5191
5192         if (rbd_dev->major)
5193                 return sprintf(buf, "%d\n", rbd_dev->major);
5194
5195         return sprintf(buf, "(none)\n");
5196 }
5197
5198 static ssize_t rbd_minor_show(struct device *dev,
5199                               struct device_attribute *attr, char *buf)
5200 {
5201         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5202
5203         return sprintf(buf, "%d\n", rbd_dev->minor);
5204 }
5205
5206 static ssize_t rbd_client_addr_show(struct device *dev,
5207                                     struct device_attribute *attr, char *buf)
5208 {
5209         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5210         struct ceph_entity_addr *client_addr =
5211             ceph_client_addr(rbd_dev->rbd_client->client);
5212
5213         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5214                        le32_to_cpu(client_addr->nonce));
5215 }
5216
5217 static ssize_t rbd_client_id_show(struct device *dev,
5218                                   struct device_attribute *attr, char *buf)
5219 {
5220         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5221
5222         return sprintf(buf, "client%lld\n",
5223                        ceph_client_gid(rbd_dev->rbd_client->client));
5224 }
5225
5226 static ssize_t rbd_cluster_fsid_show(struct device *dev,
5227                                      struct device_attribute *attr, char *buf)
5228 {
5229         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5230
5231         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5232 }
5233
5234 static ssize_t rbd_config_info_show(struct device *dev,
5235                                     struct device_attribute *attr, char *buf)
5236 {
5237         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5238
5239         return sprintf(buf, "%s\n", rbd_dev->config_info);
5240 }
5241
5242 static ssize_t rbd_pool_show(struct device *dev,
5243                              struct device_attribute *attr, char *buf)
5244 {
5245         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5246
5247         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5248 }
5249
5250 static ssize_t rbd_pool_id_show(struct device *dev,
5251                              struct device_attribute *attr, char *buf)
5252 {
5253         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5254
5255         return sprintf(buf, "%llu\n",
5256                         (unsigned long long) rbd_dev->spec->pool_id);
5257 }
5258
5259 static ssize_t rbd_pool_ns_show(struct device *dev,
5260                                 struct device_attribute *attr, char *buf)
5261 {
5262         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5263
5264         return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5265 }
5266
5267 static ssize_t rbd_name_show(struct device *dev,
5268                              struct device_attribute *attr, char *buf)
5269 {
5270         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5271
5272         if (rbd_dev->spec->image_name)
5273                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5274
5275         return sprintf(buf, "(unknown)\n");
5276 }
5277
5278 static ssize_t rbd_image_id_show(struct device *dev,
5279                              struct device_attribute *attr, char *buf)
5280 {
5281         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5282
5283         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5284 }
5285
5286 /*
5287  * Shows the name of the currently-mapped snapshot (or
5288  * RBD_SNAP_HEAD_NAME for the base image).
5289  */
5290 static ssize_t rbd_snap_show(struct device *dev,
5291                              struct device_attribute *attr,
5292                              char *buf)
5293 {
5294         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5295
5296         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5297 }
5298
5299 static ssize_t rbd_snap_id_show(struct device *dev,
5300                                 struct device_attribute *attr, char *buf)
5301 {
5302         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5303
5304         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5305 }
5306
5307 /*
5308  * For a v2 image, shows the chain of parent images, separated by empty
5309  * lines.  For v1 images or if there is no parent, shows "(no parent
5310  * image)".
5311  */
5312 static ssize_t rbd_parent_show(struct device *dev,
5313                                struct device_attribute *attr,
5314                                char *buf)
5315 {
5316         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5317         ssize_t count = 0;
5318
5319         if (!rbd_dev->parent)
5320                 return sprintf(buf, "(no parent image)\n");
5321
5322         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5323                 struct rbd_spec *spec = rbd_dev->parent_spec;
5324
5325                 count += sprintf(&buf[count], "%s"
5326                             "pool_id %llu\npool_name %s\n"
5327                             "pool_ns %s\n"
5328                             "image_id %s\nimage_name %s\n"
5329                             "snap_id %llu\nsnap_name %s\n"
5330                             "overlap %llu\n",
5331                             !count ? "" : "\n", /* first? */
5332                             spec->pool_id, spec->pool_name,
5333                             spec->pool_ns ?: "",
5334                             spec->image_id, spec->image_name ?: "(unknown)",
5335                             spec->snap_id, spec->snap_name,
5336                             rbd_dev->parent_overlap);
5337         }
5338
5339         return count;
5340 }
5341
5342 static ssize_t rbd_image_refresh(struct device *dev,
5343                                  struct device_attribute *attr,
5344                                  const char *buf,
5345                                  size_t size)
5346 {
5347         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5348         int ret;
5349
5350         ret = rbd_dev_refresh(rbd_dev);
5351         if (ret)
5352                 return ret;
5353
5354         return size;
5355 }
5356
5357 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5358 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5359 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5360 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5361 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5362 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5363 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5364 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5365 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5366 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5367 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5368 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5369 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5370 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5371 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5372 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5373 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5374
5375 static struct attribute *rbd_attrs[] = {
5376         &dev_attr_size.attr,
5377         &dev_attr_features.attr,
5378         &dev_attr_major.attr,
5379         &dev_attr_minor.attr,
5380         &dev_attr_client_addr.attr,
5381         &dev_attr_client_id.attr,
5382         &dev_attr_cluster_fsid.attr,
5383         &dev_attr_config_info.attr,
5384         &dev_attr_pool.attr,
5385         &dev_attr_pool_id.attr,
5386         &dev_attr_pool_ns.attr,
5387         &dev_attr_name.attr,
5388         &dev_attr_image_id.attr,
5389         &dev_attr_current_snap.attr,
5390         &dev_attr_snap_id.attr,
5391         &dev_attr_parent.attr,
5392         &dev_attr_refresh.attr,
5393         NULL
5394 };
5395
5396 static struct attribute_group rbd_attr_group = {
5397         .attrs = rbd_attrs,
5398 };
5399
5400 static const struct attribute_group *rbd_attr_groups[] = {
5401         &rbd_attr_group,
5402         NULL
5403 };
5404
5405 static void rbd_dev_release(struct device *dev);
5406
5407 static const struct device_type rbd_device_type = {
5408         .name           = "rbd",
5409         .groups         = rbd_attr_groups,
5410         .release        = rbd_dev_release,
5411 };
5412
5413 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5414 {
5415         kref_get(&spec->kref);
5416
5417         return spec;
5418 }
5419
5420 static void rbd_spec_free(struct kref *kref);
5421 static void rbd_spec_put(struct rbd_spec *spec)
5422 {
5423         if (spec)
5424                 kref_put(&spec->kref, rbd_spec_free);
5425 }
5426
5427 static struct rbd_spec *rbd_spec_alloc(void)
5428 {
5429         struct rbd_spec *spec;
5430
5431         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5432         if (!spec)
5433                 return NULL;
5434
5435         spec->pool_id = CEPH_NOPOOL;
5436         spec->snap_id = CEPH_NOSNAP;
5437         kref_init(&spec->kref);
5438
5439         return spec;
5440 }
5441
5442 static void rbd_spec_free(struct kref *kref)
5443 {
5444         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5445
5446         kfree(spec->pool_name);
5447         kfree(spec->pool_ns);
5448         kfree(spec->image_id);
5449         kfree(spec->image_name);
5450         kfree(spec->snap_name);
5451         kfree(spec);
5452 }
5453
5454 static void rbd_dev_free(struct rbd_device *rbd_dev)
5455 {
5456         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5457         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5458
5459         ceph_oid_destroy(&rbd_dev->header_oid);
5460         ceph_oloc_destroy(&rbd_dev->header_oloc);
5461         kfree(rbd_dev->config_info);
5462
5463         rbd_put_client(rbd_dev->rbd_client);
5464         rbd_spec_put(rbd_dev->spec);
5465         kfree(rbd_dev->opts);
5466         kfree(rbd_dev);
5467 }
5468
5469 static void rbd_dev_release(struct device *dev)
5470 {
5471         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5472         bool need_put = !!rbd_dev->opts;
5473
5474         if (need_put) {
5475                 destroy_workqueue(rbd_dev->task_wq);
5476                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5477         }
5478
5479         rbd_dev_free(rbd_dev);
5480
5481         /*
5482          * This is racy, but way better than putting module outside of
5483          * the release callback.  The race window is pretty small, so
5484          * doing something similar to dm (dm-builtin.c) is overkill.
5485          */
5486         if (need_put)
5487                 module_put(THIS_MODULE);
5488 }
5489
5490 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5491                                            struct rbd_spec *spec)
5492 {
5493         struct rbd_device *rbd_dev;
5494
5495         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5496         if (!rbd_dev)
5497                 return NULL;
5498
5499         spin_lock_init(&rbd_dev->lock);
5500         INIT_LIST_HEAD(&rbd_dev->node);
5501         init_rwsem(&rbd_dev->header_rwsem);
5502
5503         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5504         ceph_oid_init(&rbd_dev->header_oid);
5505         rbd_dev->header_oloc.pool = spec->pool_id;
5506         if (spec->pool_ns) {
5507                 WARN_ON(!*spec->pool_ns);
5508                 rbd_dev->header_oloc.pool_ns =
5509                     ceph_find_or_create_string(spec->pool_ns,
5510                                                strlen(spec->pool_ns));
5511         }
5512
5513         mutex_init(&rbd_dev->watch_mutex);
5514         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5515         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5516
5517         init_rwsem(&rbd_dev->lock_rwsem);
5518         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5519         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5520         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5521         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5522         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5523         spin_lock_init(&rbd_dev->lock_lists_lock);
5524         INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5525         INIT_LIST_HEAD(&rbd_dev->running_list);
5526         init_completion(&rbd_dev->acquire_wait);
5527         init_completion(&rbd_dev->releasing_wait);
5528
5529         spin_lock_init(&rbd_dev->object_map_lock);
5530
5531         rbd_dev->dev.bus = &rbd_bus_type;
5532         rbd_dev->dev.type = &rbd_device_type;
5533         rbd_dev->dev.parent = &rbd_root_dev;
5534         device_initialize(&rbd_dev->dev);
5535
5536         rbd_dev->rbd_client = rbdc;
5537         rbd_dev->spec = spec;
5538
5539         return rbd_dev;
5540 }
5541
5542 /*
5543  * Create a mapping rbd_dev.
5544  */
5545 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5546                                          struct rbd_spec *spec,
5547                                          struct rbd_options *opts)
5548 {
5549         struct rbd_device *rbd_dev;
5550
5551         rbd_dev = __rbd_dev_create(rbdc, spec);
5552         if (!rbd_dev)
5553                 return NULL;
5554
5555         rbd_dev->opts = opts;
5556
5557         /* get an id and fill in device name */
5558         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5559                                          minor_to_rbd_dev_id(1 << MINORBITS),
5560                                          GFP_KERNEL);
5561         if (rbd_dev->dev_id < 0)
5562                 goto fail_rbd_dev;
5563
5564         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5565         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5566                                                    rbd_dev->name);
5567         if (!rbd_dev->task_wq)
5568                 goto fail_dev_id;
5569
5570         /* we have a ref from do_rbd_add() */
5571         __module_get(THIS_MODULE);
5572
5573         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5574         return rbd_dev;
5575
5576 fail_dev_id:
5577         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5578 fail_rbd_dev:
5579         rbd_dev_free(rbd_dev);
5580         return NULL;
5581 }
5582
5583 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5584 {
5585         if (rbd_dev)
5586                 put_device(&rbd_dev->dev);
5587 }
5588
5589 /*
5590  * Get the size and object order for an image snapshot, or if
5591  * snap_id is CEPH_NOSNAP, gets this information for the base
5592  * image.
5593  */
5594 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5595                                 u8 *order, u64 *snap_size)
5596 {
5597         __le64 snapid = cpu_to_le64(snap_id);
5598         int ret;
5599         struct {
5600                 u8 order;
5601                 __le64 size;
5602         } __attribute__ ((packed)) size_buf = { 0 };
5603
5604         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5605                                   &rbd_dev->header_oloc, "get_size",
5606                                   &snapid, sizeof(snapid),
5607                                   &size_buf, sizeof(size_buf));
5608         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5609         if (ret < 0)
5610                 return ret;
5611         if (ret < sizeof (size_buf))
5612                 return -ERANGE;
5613
5614         if (order) {
5615                 *order = size_buf.order;
5616                 dout("  order %u", (unsigned int)*order);
5617         }
5618         *snap_size = le64_to_cpu(size_buf.size);
5619
5620         dout("  snap_id 0x%016llx snap_size = %llu\n",
5621                 (unsigned long long)snap_id,
5622                 (unsigned long long)*snap_size);
5623
5624         return 0;
5625 }
5626
5627 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5628 {
5629         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5630                                         &rbd_dev->header.obj_order,
5631                                         &rbd_dev->header.image_size);
5632 }
5633
5634 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5635 {
5636         size_t size;
5637         void *reply_buf;
5638         int ret;
5639         void *p;
5640
5641         /* Response will be an encoded string, which includes a length */
5642         size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5643         reply_buf = kzalloc(size, GFP_KERNEL);
5644         if (!reply_buf)
5645                 return -ENOMEM;
5646
5647         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5648                                   &rbd_dev->header_oloc, "get_object_prefix",
5649                                   NULL, 0, reply_buf, size);
5650         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5651         if (ret < 0)
5652                 goto out;
5653
5654         p = reply_buf;
5655         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5656                                                 p + ret, NULL, GFP_NOIO);
5657         ret = 0;
5658
5659         if (IS_ERR(rbd_dev->header.object_prefix)) {
5660                 ret = PTR_ERR(rbd_dev->header.object_prefix);
5661                 rbd_dev->header.object_prefix = NULL;
5662         } else {
5663                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5664         }
5665 out:
5666         kfree(reply_buf);
5667
5668         return ret;
5669 }
5670
5671 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5672                 u64 *snap_features)
5673 {
5674         __le64 snapid = cpu_to_le64(snap_id);
5675         struct {
5676                 __le64 features;
5677                 __le64 incompat;
5678         } __attribute__ ((packed)) features_buf = { 0 };
5679         u64 unsup;
5680         int ret;
5681
5682         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5683                                   &rbd_dev->header_oloc, "get_features",
5684                                   &snapid, sizeof(snapid),
5685                                   &features_buf, sizeof(features_buf));
5686         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5687         if (ret < 0)
5688                 return ret;
5689         if (ret < sizeof (features_buf))
5690                 return -ERANGE;
5691
5692         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5693         if (unsup) {
5694                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5695                          unsup);
5696                 return -ENXIO;
5697         }
5698
5699         *snap_features = le64_to_cpu(features_buf.features);
5700
5701         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5702                 (unsigned long long)snap_id,
5703                 (unsigned long long)*snap_features,
5704                 (unsigned long long)le64_to_cpu(features_buf.incompat));
5705
5706         return 0;
5707 }
5708
5709 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5710 {
5711         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5712                                                 &rbd_dev->header.features);
5713 }
5714
5715 /*
5716  * These are generic image flags, but since they are used only for
5717  * object map, store them in rbd_dev->object_map_flags.
5718  *
5719  * For the same reason, this function is called only on object map
5720  * (re)load and not on header refresh.
5721  */
5722 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5723 {
5724         __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5725         __le64 flags;
5726         int ret;
5727
5728         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5729                                   &rbd_dev->header_oloc, "get_flags",
5730                                   &snapid, sizeof(snapid),
5731                                   &flags, sizeof(flags));
5732         if (ret < 0)
5733                 return ret;
5734         if (ret < sizeof(flags))
5735                 return -EBADMSG;
5736
5737         rbd_dev->object_map_flags = le64_to_cpu(flags);
5738         return 0;
5739 }
5740
5741 struct parent_image_info {
5742         u64             pool_id;
5743         const char      *pool_ns;
5744         const char      *image_id;
5745         u64             snap_id;
5746
5747         bool            has_overlap;
5748         u64             overlap;
5749 };
5750
5751 /*
5752  * The caller is responsible for @pii.
5753  */
5754 static int decode_parent_image_spec(void **p, void *end,
5755                                     struct parent_image_info *pii)
5756 {
5757         u8 struct_v;
5758         u32 struct_len;
5759         int ret;
5760
5761         ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5762                                   &struct_v, &struct_len);
5763         if (ret)
5764                 return ret;
5765
5766         ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5767         pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5768         if (IS_ERR(pii->pool_ns)) {
5769                 ret = PTR_ERR(pii->pool_ns);
5770                 pii->pool_ns = NULL;
5771                 return ret;
5772         }
5773         pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5774         if (IS_ERR(pii->image_id)) {
5775                 ret = PTR_ERR(pii->image_id);
5776                 pii->image_id = NULL;
5777                 return ret;
5778         }
5779         ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5780         return 0;
5781
5782 e_inval:
5783         return -EINVAL;
5784 }
5785
5786 static int __get_parent_info(struct rbd_device *rbd_dev,
5787                              struct page *req_page,
5788                              struct page *reply_page,
5789                              struct parent_image_info *pii)
5790 {
5791         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5792         size_t reply_len = PAGE_SIZE;
5793         void *p, *end;
5794         int ret;
5795
5796         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5797                              "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5798                              req_page, sizeof(u64), &reply_page, &reply_len);
5799         if (ret)
5800                 return ret == -EOPNOTSUPP ? 1 : ret;
5801
5802         p = page_address(reply_page);
5803         end = p + reply_len;
5804         ret = decode_parent_image_spec(&p, end, pii);
5805         if (ret)
5806                 return ret;
5807
5808         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5809                              "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5810                              req_page, sizeof(u64), &reply_page, &reply_len);
5811         if (ret)
5812                 return ret;
5813
5814         p = page_address(reply_page);
5815         end = p + reply_len;
5816         ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5817         if (pii->has_overlap)
5818                 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5819
5820         return 0;
5821
5822 e_inval:
5823         return -EINVAL;
5824 }
5825
5826 /*
5827  * The caller is responsible for @pii.
5828  */
5829 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5830                                     struct page *req_page,
5831                                     struct page *reply_page,
5832                                     struct parent_image_info *pii)
5833 {
5834         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5835         size_t reply_len = PAGE_SIZE;
5836         void *p, *end;
5837         int ret;
5838
5839         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5840                              "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5841                              req_page, sizeof(u64), &reply_page, &reply_len);
5842         if (ret)
5843                 return ret;
5844
5845         p = page_address(reply_page);
5846         end = p + reply_len;
5847         ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5848         pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5849         if (IS_ERR(pii->image_id)) {
5850                 ret = PTR_ERR(pii->image_id);
5851                 pii->image_id = NULL;
5852                 return ret;
5853         }
5854         ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5855         pii->has_overlap = true;
5856         ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5857
5858         return 0;
5859
5860 e_inval:
5861         return -EINVAL;
5862 }
5863
5864 static int get_parent_info(struct rbd_device *rbd_dev,
5865                            struct parent_image_info *pii)
5866 {
5867         struct page *req_page, *reply_page;
5868         void *p;
5869         int ret;
5870
5871         req_page = alloc_page(GFP_KERNEL);
5872         if (!req_page)
5873                 return -ENOMEM;
5874
5875         reply_page = alloc_page(GFP_KERNEL);
5876         if (!reply_page) {
5877                 __free_page(req_page);
5878                 return -ENOMEM;
5879         }
5880
5881         p = page_address(req_page);
5882         ceph_encode_64(&p, rbd_dev->spec->snap_id);
5883         ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5884         if (ret > 0)
5885                 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5886                                                pii);
5887
5888         __free_page(req_page);
5889         __free_page(reply_page);
5890         return ret;
5891 }
5892
5893 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5894 {
5895         struct rbd_spec *parent_spec;
5896         struct parent_image_info pii = { 0 };
5897         int ret;
5898
5899         parent_spec = rbd_spec_alloc();
5900         if (!parent_spec)
5901                 return -ENOMEM;
5902
5903         ret = get_parent_info(rbd_dev, &pii);
5904         if (ret)
5905                 goto out_err;
5906
5907         dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5908              __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5909              pii.has_overlap, pii.overlap);
5910
5911         if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5912                 /*
5913                  * Either the parent never existed, or we have
5914                  * record of it but the image got flattened so it no
5915                  * longer has a parent.  When the parent of a
5916                  * layered image disappears we immediately set the
5917                  * overlap to 0.  The effect of this is that all new
5918                  * requests will be treated as if the image had no
5919                  * parent.
5920                  *
5921                  * If !pii.has_overlap, the parent image spec is not
5922                  * applicable.  It's there to avoid duplication in each
5923                  * snapshot record.
5924                  */
5925                 if (rbd_dev->parent_overlap) {
5926                         rbd_dev->parent_overlap = 0;
5927                         rbd_dev_parent_put(rbd_dev);
5928                         pr_info("%s: clone image has been flattened\n",
5929                                 rbd_dev->disk->disk_name);
5930                 }
5931
5932                 goto out;       /* No parent?  No problem. */
5933         }
5934
5935         /* The ceph file layout needs to fit pool id in 32 bits */
5936
5937         ret = -EIO;
5938         if (pii.pool_id > (u64)U32_MAX) {
5939                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5940                         (unsigned long long)pii.pool_id, U32_MAX);
5941                 goto out_err;
5942         }
5943
5944         /*
5945          * The parent won't change (except when the clone is
5946          * flattened, already handled that).  So we only need to
5947          * record the parent spec we have not already done so.
5948          */
5949         if (!rbd_dev->parent_spec) {
5950                 parent_spec->pool_id = pii.pool_id;
5951                 if (pii.pool_ns && *pii.pool_ns) {
5952                         parent_spec->pool_ns = pii.pool_ns;
5953                         pii.pool_ns = NULL;
5954                 }
5955                 parent_spec->image_id = pii.image_id;
5956                 pii.image_id = NULL;
5957                 parent_spec->snap_id = pii.snap_id;
5958
5959                 rbd_dev->parent_spec = parent_spec;
5960                 parent_spec = NULL;     /* rbd_dev now owns this */
5961         }
5962
5963         /*
5964          * We always update the parent overlap.  If it's zero we issue
5965          * a warning, as we will proceed as if there was no parent.
5966          */
5967         if (!pii.overlap) {
5968                 if (parent_spec) {
5969                         /* refresh, careful to warn just once */
5970                         if (rbd_dev->parent_overlap)
5971                                 rbd_warn(rbd_dev,
5972                                     "clone now standalone (overlap became 0)");
5973                 } else {
5974                         /* initial probe */
5975                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5976                 }
5977         }
5978         rbd_dev->parent_overlap = pii.overlap;
5979
5980 out:
5981         ret = 0;
5982 out_err:
5983         kfree(pii.pool_ns);
5984         kfree(pii.image_id);
5985         rbd_spec_put(parent_spec);
5986         return ret;
5987 }
5988
5989 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5990 {
5991         struct {
5992                 __le64 stripe_unit;
5993                 __le64 stripe_count;
5994         } __attribute__ ((packed)) striping_info_buf = { 0 };
5995         size_t size = sizeof (striping_info_buf);
5996         void *p;
5997         int ret;
5998
5999         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6000                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
6001                                 NULL, 0, &striping_info_buf, size);
6002         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6003         if (ret < 0)
6004                 return ret;
6005         if (ret < size)
6006                 return -ERANGE;
6007
6008         p = &striping_info_buf;
6009         rbd_dev->header.stripe_unit = ceph_decode_64(&p);
6010         rbd_dev->header.stripe_count = ceph_decode_64(&p);
6011         return 0;
6012 }
6013
6014 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
6015 {
6016         __le64 data_pool_id;
6017         int ret;
6018
6019         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6020                                   &rbd_dev->header_oloc, "get_data_pool",
6021                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
6022         if (ret < 0)
6023                 return ret;
6024         if (ret < sizeof(data_pool_id))
6025                 return -EBADMSG;
6026
6027         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
6028         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
6029         return 0;
6030 }
6031
6032 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
6033 {
6034         CEPH_DEFINE_OID_ONSTACK(oid);
6035         size_t image_id_size;
6036         char *image_id;
6037         void *p;
6038         void *end;
6039         size_t size;
6040         void *reply_buf = NULL;
6041         size_t len = 0;
6042         char *image_name = NULL;
6043         int ret;
6044
6045         rbd_assert(!rbd_dev->spec->image_name);
6046
6047         len = strlen(rbd_dev->spec->image_id);
6048         image_id_size = sizeof (__le32) + len;
6049         image_id = kmalloc(image_id_size, GFP_KERNEL);
6050         if (!image_id)
6051                 return NULL;
6052
6053         p = image_id;
6054         end = image_id + image_id_size;
6055         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
6056
6057         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
6058         reply_buf = kmalloc(size, GFP_KERNEL);
6059         if (!reply_buf)
6060                 goto out;
6061
6062         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
6063         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6064                                   "dir_get_name", image_id, image_id_size,
6065                                   reply_buf, size);
6066         if (ret < 0)
6067                 goto out;
6068         p = reply_buf;
6069         end = reply_buf + ret;
6070
6071         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
6072         if (IS_ERR(image_name))
6073                 image_name = NULL;
6074         else
6075                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
6076 out:
6077         kfree(reply_buf);
6078         kfree(image_id);
6079
6080         return image_name;
6081 }
6082
6083 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6084 {
6085         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6086         const char *snap_name;
6087         u32 which = 0;
6088
6089         /* Skip over names until we find the one we are looking for */
6090
6091         snap_name = rbd_dev->header.snap_names;
6092         while (which < snapc->num_snaps) {
6093                 if (!strcmp(name, snap_name))
6094                         return snapc->snaps[which];
6095                 snap_name += strlen(snap_name) + 1;
6096                 which++;
6097         }
6098         return CEPH_NOSNAP;
6099 }
6100
6101 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6102 {
6103         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6104         u32 which;
6105         bool found = false;
6106         u64 snap_id;
6107
6108         for (which = 0; !found && which < snapc->num_snaps; which++) {
6109                 const char *snap_name;
6110
6111                 snap_id = snapc->snaps[which];
6112                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6113                 if (IS_ERR(snap_name)) {
6114                         /* ignore no-longer existing snapshots */
6115                         if (PTR_ERR(snap_name) == -ENOENT)
6116                                 continue;
6117                         else
6118                                 break;
6119                 }
6120                 found = !strcmp(name, snap_name);
6121                 kfree(snap_name);
6122         }
6123         return found ? snap_id : CEPH_NOSNAP;
6124 }
6125
6126 /*
6127  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
6128  * no snapshot by that name is found, or if an error occurs.
6129  */
6130 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6131 {
6132         if (rbd_dev->image_format == 1)
6133                 return rbd_v1_snap_id_by_name(rbd_dev, name);
6134
6135         return rbd_v2_snap_id_by_name(rbd_dev, name);
6136 }
6137
6138 /*
6139  * An image being mapped will have everything but the snap id.
6140  */
6141 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6142 {
6143         struct rbd_spec *spec = rbd_dev->spec;
6144
6145         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6146         rbd_assert(spec->image_id && spec->image_name);
6147         rbd_assert(spec->snap_name);
6148
6149         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6150                 u64 snap_id;
6151
6152                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6153                 if (snap_id == CEPH_NOSNAP)
6154                         return -ENOENT;
6155
6156                 spec->snap_id = snap_id;
6157         } else {
6158                 spec->snap_id = CEPH_NOSNAP;
6159         }
6160
6161         return 0;
6162 }
6163
6164 /*
6165  * A parent image will have all ids but none of the names.
6166  *
6167  * All names in an rbd spec are dynamically allocated.  It's OK if we
6168  * can't figure out the name for an image id.
6169  */
6170 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6171 {
6172         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6173         struct rbd_spec *spec = rbd_dev->spec;
6174         const char *pool_name;
6175         const char *image_name;
6176         const char *snap_name;
6177         int ret;
6178
6179         rbd_assert(spec->pool_id != CEPH_NOPOOL);
6180         rbd_assert(spec->image_id);
6181         rbd_assert(spec->snap_id != CEPH_NOSNAP);
6182
6183         /* Get the pool name; we have to make our own copy of this */
6184
6185         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6186         if (!pool_name) {
6187                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6188                 return -EIO;
6189         }
6190         pool_name = kstrdup(pool_name, GFP_KERNEL);
6191         if (!pool_name)
6192                 return -ENOMEM;
6193
6194         /* Fetch the image name; tolerate failure here */
6195
6196         image_name = rbd_dev_image_name(rbd_dev);
6197         if (!image_name)
6198                 rbd_warn(rbd_dev, "unable to get image name");
6199
6200         /* Fetch the snapshot name */
6201
6202         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6203         if (IS_ERR(snap_name)) {
6204                 ret = PTR_ERR(snap_name);
6205                 goto out_err;
6206         }
6207
6208         spec->pool_name = pool_name;
6209         spec->image_name = image_name;
6210         spec->snap_name = snap_name;
6211
6212         return 0;
6213
6214 out_err:
6215         kfree(image_name);
6216         kfree(pool_name);
6217         return ret;
6218 }
6219
6220 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6221 {
6222         size_t size;
6223         int ret;
6224         void *reply_buf;
6225         void *p;
6226         void *end;
6227         u64 seq;
6228         u32 snap_count;
6229         struct ceph_snap_context *snapc;
6230         u32 i;
6231
6232         /*
6233          * We'll need room for the seq value (maximum snapshot id),
6234          * snapshot count, and array of that many snapshot ids.
6235          * For now we have a fixed upper limit on the number we're
6236          * prepared to receive.
6237          */
6238         size = sizeof (__le64) + sizeof (__le32) +
6239                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
6240         reply_buf = kzalloc(size, GFP_KERNEL);
6241         if (!reply_buf)
6242                 return -ENOMEM;
6243
6244         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6245                                   &rbd_dev->header_oloc, "get_snapcontext",
6246                                   NULL, 0, reply_buf, size);
6247         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6248         if (ret < 0)
6249                 goto out;
6250
6251         p = reply_buf;
6252         end = reply_buf + ret;
6253         ret = -ERANGE;
6254         ceph_decode_64_safe(&p, end, seq, out);
6255         ceph_decode_32_safe(&p, end, snap_count, out);
6256
6257         /*
6258          * Make sure the reported number of snapshot ids wouldn't go
6259          * beyond the end of our buffer.  But before checking that,
6260          * make sure the computed size of the snapshot context we
6261          * allocate is representable in a size_t.
6262          */
6263         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6264                                  / sizeof (u64)) {
6265                 ret = -EINVAL;
6266                 goto out;
6267         }
6268         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6269                 goto out;
6270         ret = 0;
6271
6272         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6273         if (!snapc) {
6274                 ret = -ENOMEM;
6275                 goto out;
6276         }
6277         snapc->seq = seq;
6278         for (i = 0; i < snap_count; i++)
6279                 snapc->snaps[i] = ceph_decode_64(&p);
6280
6281         ceph_put_snap_context(rbd_dev->header.snapc);
6282         rbd_dev->header.snapc = snapc;
6283
6284         dout("  snap context seq = %llu, snap_count = %u\n",
6285                 (unsigned long long)seq, (unsigned int)snap_count);
6286 out:
6287         kfree(reply_buf);
6288
6289         return ret;
6290 }
6291
6292 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6293                                         u64 snap_id)
6294 {
6295         size_t size;
6296         void *reply_buf;
6297         __le64 snapid;
6298         int ret;
6299         void *p;
6300         void *end;
6301         char *snap_name;
6302
6303         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6304         reply_buf = kmalloc(size, GFP_KERNEL);
6305         if (!reply_buf)
6306                 return ERR_PTR(-ENOMEM);
6307
6308         snapid = cpu_to_le64(snap_id);
6309         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6310                                   &rbd_dev->header_oloc, "get_snapshot_name",
6311                                   &snapid, sizeof(snapid), reply_buf, size);
6312         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6313         if (ret < 0) {
6314                 snap_name = ERR_PTR(ret);
6315                 goto out;
6316         }
6317
6318         p = reply_buf;
6319         end = reply_buf + ret;
6320         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6321         if (IS_ERR(snap_name))
6322                 goto out;
6323
6324         dout("  snap_id 0x%016llx snap_name = %s\n",
6325                 (unsigned long long)snap_id, snap_name);
6326 out:
6327         kfree(reply_buf);
6328
6329         return snap_name;
6330 }
6331
6332 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6333 {
6334         bool first_time = rbd_dev->header.object_prefix == NULL;
6335         int ret;
6336
6337         ret = rbd_dev_v2_image_size(rbd_dev);
6338         if (ret)
6339                 return ret;
6340
6341         if (first_time) {
6342                 ret = rbd_dev_v2_header_onetime(rbd_dev);
6343                 if (ret)
6344                         return ret;
6345         }
6346
6347         ret = rbd_dev_v2_snap_context(rbd_dev);
6348         if (ret && first_time) {
6349                 kfree(rbd_dev->header.object_prefix);
6350                 rbd_dev->header.object_prefix = NULL;
6351         }
6352
6353         return ret;
6354 }
6355
6356 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6357 {
6358         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6359
6360         if (rbd_dev->image_format == 1)
6361                 return rbd_dev_v1_header_info(rbd_dev);
6362
6363         return rbd_dev_v2_header_info(rbd_dev);
6364 }
6365
6366 /*
6367  * Skips over white space at *buf, and updates *buf to point to the
6368  * first found non-space character (if any). Returns the length of
6369  * the token (string of non-white space characters) found.  Note
6370  * that *buf must be terminated with '\0'.
6371  */
6372 static inline size_t next_token(const char **buf)
6373 {
6374         /*
6375         * These are the characters that produce nonzero for
6376         * isspace() in the "C" and "POSIX" locales.
6377         */
6378         const char *spaces = " \f\n\r\t\v";
6379
6380         *buf += strspn(*buf, spaces);   /* Find start of token */
6381
6382         return strcspn(*buf, spaces);   /* Return token length */
6383 }
6384
6385 /*
6386  * Finds the next token in *buf, dynamically allocates a buffer big
6387  * enough to hold a copy of it, and copies the token into the new
6388  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6389  * that a duplicate buffer is created even for a zero-length token.
6390  *
6391  * Returns a pointer to the newly-allocated duplicate, or a null
6392  * pointer if memory for the duplicate was not available.  If
6393  * the lenp argument is a non-null pointer, the length of the token
6394  * (not including the '\0') is returned in *lenp.
6395  *
6396  * If successful, the *buf pointer will be updated to point beyond
6397  * the end of the found token.
6398  *
6399  * Note: uses GFP_KERNEL for allocation.
6400  */
6401 static inline char *dup_token(const char **buf, size_t *lenp)
6402 {
6403         char *dup;
6404         size_t len;
6405
6406         len = next_token(buf);
6407         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6408         if (!dup)
6409                 return NULL;
6410         *(dup + len) = '\0';
6411         *buf += len;
6412
6413         if (lenp)
6414                 *lenp = len;
6415
6416         return dup;
6417 }
6418
6419 /*
6420  * Parse the options provided for an "rbd add" (i.e., rbd image
6421  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6422  * and the data written is passed here via a NUL-terminated buffer.
6423  * Returns 0 if successful or an error code otherwise.
6424  *
6425  * The information extracted from these options is recorded in
6426  * the other parameters which return dynamically-allocated
6427  * structures:
6428  *  ceph_opts
6429  *      The address of a pointer that will refer to a ceph options
6430  *      structure.  Caller must release the returned pointer using
6431  *      ceph_destroy_options() when it is no longer needed.
6432  *  rbd_opts
6433  *      Address of an rbd options pointer.  Fully initialized by
6434  *      this function; caller must release with kfree().
6435  *  spec
6436  *      Address of an rbd image specification pointer.  Fully
6437  *      initialized by this function based on parsed options.
6438  *      Caller must release with rbd_spec_put().
6439  *
6440  * The options passed take this form:
6441  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6442  * where:
6443  *  <mon_addrs>
6444  *      A comma-separated list of one or more monitor addresses.
6445  *      A monitor address is an ip address, optionally followed
6446  *      by a port number (separated by a colon).
6447  *        I.e.:  ip1[:port1][,ip2[:port2]...]
6448  *  <options>
6449  *      A comma-separated list of ceph and/or rbd options.
6450  *  <pool_name>
6451  *      The name of the rados pool containing the rbd image.
6452  *  <image_name>
6453  *      The name of the image in that pool to map.
6454  *  <snap_id>
6455  *      An optional snapshot id.  If provided, the mapping will
6456  *      present data from the image at the time that snapshot was
6457  *      created.  The image head is used if no snapshot id is
6458  *      provided.  Snapshot mappings are always read-only.
6459  */
6460 static int rbd_add_parse_args(const char *buf,
6461                                 struct ceph_options **ceph_opts,
6462                                 struct rbd_options **opts,
6463                                 struct rbd_spec **rbd_spec)
6464 {
6465         size_t len;
6466         char *options;
6467         const char *mon_addrs;
6468         char *snap_name;
6469         size_t mon_addrs_size;
6470         struct parse_rbd_opts_ctx pctx = { 0 };
6471         struct ceph_options *copts;
6472         int ret;
6473
6474         /* The first four tokens are required */
6475
6476         len = next_token(&buf);
6477         if (!len) {
6478                 rbd_warn(NULL, "no monitor address(es) provided");
6479                 return -EINVAL;
6480         }
6481         mon_addrs = buf;
6482         mon_addrs_size = len + 1;
6483         buf += len;
6484
6485         ret = -EINVAL;
6486         options = dup_token(&buf, NULL);
6487         if (!options)
6488                 return -ENOMEM;
6489         if (!*options) {
6490                 rbd_warn(NULL, "no options provided");
6491                 goto out_err;
6492         }
6493
6494         pctx.spec = rbd_spec_alloc();
6495         if (!pctx.spec)
6496                 goto out_mem;
6497
6498         pctx.spec->pool_name = dup_token(&buf, NULL);
6499         if (!pctx.spec->pool_name)
6500                 goto out_mem;
6501         if (!*pctx.spec->pool_name) {
6502                 rbd_warn(NULL, "no pool name provided");
6503                 goto out_err;
6504         }
6505
6506         pctx.spec->image_name = dup_token(&buf, NULL);
6507         if (!pctx.spec->image_name)
6508                 goto out_mem;
6509         if (!*pctx.spec->image_name) {
6510                 rbd_warn(NULL, "no image name provided");
6511                 goto out_err;
6512         }
6513
6514         /*
6515          * Snapshot name is optional; default is to use "-"
6516          * (indicating the head/no snapshot).
6517          */
6518         len = next_token(&buf);
6519         if (!len) {
6520                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6521                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6522         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6523                 ret = -ENAMETOOLONG;
6524                 goto out_err;
6525         }
6526         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6527         if (!snap_name)
6528                 goto out_mem;
6529         *(snap_name + len) = '\0';
6530         pctx.spec->snap_name = snap_name;
6531
6532         /* Initialize all rbd options to the defaults */
6533
6534         pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6535         if (!pctx.opts)
6536                 goto out_mem;
6537
6538         pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6539         pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6540         pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6541         pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6542         pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6543         pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6544         pctx.opts->trim = RBD_TRIM_DEFAULT;
6545
6546         copts = ceph_parse_options(options, mon_addrs,
6547                                    mon_addrs + mon_addrs_size - 1,
6548                                    parse_rbd_opts_token, &pctx);
6549         if (IS_ERR(copts)) {
6550                 ret = PTR_ERR(copts);
6551                 goto out_err;
6552         }
6553         kfree(options);
6554
6555         *ceph_opts = copts;
6556         *opts = pctx.opts;
6557         *rbd_spec = pctx.spec;
6558
6559         return 0;
6560 out_mem:
6561         ret = -ENOMEM;
6562 out_err:
6563         kfree(pctx.opts);
6564         rbd_spec_put(pctx.spec);
6565         kfree(options);
6566
6567         return ret;
6568 }
6569
6570 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6571 {
6572         down_write(&rbd_dev->lock_rwsem);
6573         if (__rbd_is_lock_owner(rbd_dev))
6574                 __rbd_release_lock(rbd_dev);
6575         up_write(&rbd_dev->lock_rwsem);
6576 }
6577
6578 /*
6579  * If the wait is interrupted, an error is returned even if the lock
6580  * was successfully acquired.  rbd_dev_image_unlock() will release it
6581  * if needed.
6582  */
6583 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6584 {
6585         long ret;
6586
6587         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6588                 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6589                         return 0;
6590
6591                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6592                 return -EINVAL;
6593         }
6594
6595         if (rbd_is_ro(rbd_dev))
6596                 return 0;
6597
6598         rbd_assert(!rbd_is_lock_owner(rbd_dev));
6599         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6600         ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6601                             ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6602         if (ret > 0) {
6603                 ret = rbd_dev->acquire_err;
6604         } else {
6605                 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6606                 if (!ret)
6607                         ret = -ETIMEDOUT;
6608         }
6609
6610         if (ret) {
6611                 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6612                 return ret;
6613         }
6614
6615         /*
6616          * The lock may have been released by now, unless automatic lock
6617          * transitions are disabled.
6618          */
6619         rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6620         return 0;
6621 }
6622
6623 /*
6624  * An rbd format 2 image has a unique identifier, distinct from the
6625  * name given to it by the user.  Internally, that identifier is
6626  * what's used to specify the names of objects related to the image.
6627  *
6628  * A special "rbd id" object is used to map an rbd image name to its
6629  * id.  If that object doesn't exist, then there is no v2 rbd image
6630  * with the supplied name.
6631  *
6632  * This function will record the given rbd_dev's image_id field if
6633  * it can be determined, and in that case will return 0.  If any
6634  * errors occur a negative errno will be returned and the rbd_dev's
6635  * image_id field will be unchanged (and should be NULL).
6636  */
6637 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6638 {
6639         int ret;
6640         size_t size;
6641         CEPH_DEFINE_OID_ONSTACK(oid);
6642         void *response;
6643         char *image_id;
6644
6645         /*
6646          * When probing a parent image, the image id is already
6647          * known (and the image name likely is not).  There's no
6648          * need to fetch the image id again in this case.  We
6649          * do still need to set the image format though.
6650          */
6651         if (rbd_dev->spec->image_id) {
6652                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6653
6654                 return 0;
6655         }
6656
6657         /*
6658          * First, see if the format 2 image id file exists, and if
6659          * so, get the image's persistent id from it.
6660          */
6661         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6662                                rbd_dev->spec->image_name);
6663         if (ret)
6664                 return ret;
6665
6666         dout("rbd id object name is %s\n", oid.name);
6667
6668         /* Response will be an encoded string, which includes a length */
6669         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6670         response = kzalloc(size, GFP_NOIO);
6671         if (!response) {
6672                 ret = -ENOMEM;
6673                 goto out;
6674         }
6675
6676         /* If it doesn't exist we'll assume it's a format 1 image */
6677
6678         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6679                                   "get_id", NULL, 0,
6680                                   response, size);
6681         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6682         if (ret == -ENOENT) {
6683                 image_id = kstrdup("", GFP_KERNEL);
6684                 ret = image_id ? 0 : -ENOMEM;
6685                 if (!ret)
6686                         rbd_dev->image_format = 1;
6687         } else if (ret >= 0) {
6688                 void *p = response;
6689
6690                 image_id = ceph_extract_encoded_string(&p, p + ret,
6691                                                 NULL, GFP_NOIO);
6692                 ret = PTR_ERR_OR_ZERO(image_id);
6693                 if (!ret)
6694                         rbd_dev->image_format = 2;
6695         }
6696
6697         if (!ret) {
6698                 rbd_dev->spec->image_id = image_id;
6699                 dout("image_id is %s\n", image_id);
6700         }
6701 out:
6702         kfree(response);
6703         ceph_oid_destroy(&oid);
6704         return ret;
6705 }
6706
6707 /*
6708  * Undo whatever state changes are made by v1 or v2 header info
6709  * call.
6710  */
6711 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6712 {
6713         struct rbd_image_header *header;
6714
6715         rbd_dev_parent_put(rbd_dev);
6716         rbd_object_map_free(rbd_dev);
6717         rbd_dev_mapping_clear(rbd_dev);
6718
6719         /* Free dynamic fields from the header, then zero it out */
6720
6721         header = &rbd_dev->header;
6722         ceph_put_snap_context(header->snapc);
6723         kfree(header->snap_sizes);
6724         kfree(header->snap_names);
6725         kfree(header->object_prefix);
6726         memset(header, 0, sizeof (*header));
6727 }
6728
6729 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6730 {
6731         int ret;
6732
6733         ret = rbd_dev_v2_object_prefix(rbd_dev);
6734         if (ret)
6735                 goto out_err;
6736
6737         /*
6738          * Get the and check features for the image.  Currently the
6739          * features are assumed to never change.
6740          */
6741         ret = rbd_dev_v2_features(rbd_dev);
6742         if (ret)
6743                 goto out_err;
6744
6745         /* If the image supports fancy striping, get its parameters */
6746
6747         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6748                 ret = rbd_dev_v2_striping_info(rbd_dev);
6749                 if (ret < 0)
6750                         goto out_err;
6751         }
6752
6753         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6754                 ret = rbd_dev_v2_data_pool(rbd_dev);
6755                 if (ret)
6756                         goto out_err;
6757         }
6758
6759         rbd_init_layout(rbd_dev);
6760         return 0;
6761
6762 out_err:
6763         rbd_dev->header.features = 0;
6764         kfree(rbd_dev->header.object_prefix);
6765         rbd_dev->header.object_prefix = NULL;
6766         return ret;
6767 }
6768
6769 /*
6770  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6771  * rbd_dev_image_probe() recursion depth, which means it's also the
6772  * length of the already discovered part of the parent chain.
6773  */
6774 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6775 {
6776         struct rbd_device *parent = NULL;
6777         int ret;
6778
6779         if (!rbd_dev->parent_spec)
6780                 return 0;
6781
6782         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6783                 pr_info("parent chain is too long (%d)\n", depth);
6784                 ret = -EINVAL;
6785                 goto out_err;
6786         }
6787
6788         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6789         if (!parent) {
6790                 ret = -ENOMEM;
6791                 goto out_err;
6792         }
6793
6794         /*
6795          * Images related by parent/child relationships always share
6796          * rbd_client and spec/parent_spec, so bump their refcounts.
6797          */
6798         __rbd_get_client(rbd_dev->rbd_client);
6799         rbd_spec_get(rbd_dev->parent_spec);
6800
6801         __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6802
6803         ret = rbd_dev_image_probe(parent, depth);
6804         if (ret < 0)
6805                 goto out_err;
6806
6807         rbd_dev->parent = parent;
6808         atomic_set(&rbd_dev->parent_ref, 1);
6809         return 0;
6810
6811 out_err:
6812         rbd_dev_unparent(rbd_dev);
6813         rbd_dev_destroy(parent);
6814         return ret;
6815 }
6816
6817 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6818 {
6819         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6820         rbd_free_disk(rbd_dev);
6821         if (!single_major)
6822                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6823 }
6824
6825 /*
6826  * rbd_dev->header_rwsem must be locked for write and will be unlocked
6827  * upon return.
6828  */
6829 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6830 {
6831         int ret;
6832
6833         /* Record our major and minor device numbers. */
6834
6835         if (!single_major) {
6836                 ret = register_blkdev(0, rbd_dev->name);
6837                 if (ret < 0)
6838                         goto err_out_unlock;
6839
6840                 rbd_dev->major = ret;
6841                 rbd_dev->minor = 0;
6842         } else {
6843                 rbd_dev->major = rbd_major;
6844                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6845         }
6846
6847         /* Set up the blkdev mapping. */
6848
6849         ret = rbd_init_disk(rbd_dev);
6850         if (ret)
6851                 goto err_out_blkdev;
6852
6853         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6854         set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6855
6856         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6857         if (ret)
6858                 goto err_out_disk;
6859
6860         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6861         up_write(&rbd_dev->header_rwsem);
6862         return 0;
6863
6864 err_out_disk:
6865         rbd_free_disk(rbd_dev);
6866 err_out_blkdev:
6867         if (!single_major)
6868                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6869 err_out_unlock:
6870         up_write(&rbd_dev->header_rwsem);
6871         return ret;
6872 }
6873
6874 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6875 {
6876         struct rbd_spec *spec = rbd_dev->spec;
6877         int ret;
6878
6879         /* Record the header object name for this rbd image. */
6880
6881         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6882         if (rbd_dev->image_format == 1)
6883                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6884                                        spec->image_name, RBD_SUFFIX);
6885         else
6886                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6887                                        RBD_HEADER_PREFIX, spec->image_id);
6888
6889         return ret;
6890 }
6891
6892 static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6893 {
6894         if (!is_snap) {
6895                 pr_info("image %s/%s%s%s does not exist\n",
6896                         rbd_dev->spec->pool_name,
6897                         rbd_dev->spec->pool_ns ?: "",
6898                         rbd_dev->spec->pool_ns ? "/" : "",
6899                         rbd_dev->spec->image_name);
6900         } else {
6901                 pr_info("snap %s/%s%s%s@%s does not exist\n",
6902                         rbd_dev->spec->pool_name,
6903                         rbd_dev->spec->pool_ns ?: "",
6904                         rbd_dev->spec->pool_ns ? "/" : "",
6905                         rbd_dev->spec->image_name,
6906                         rbd_dev->spec->snap_name);
6907         }
6908 }
6909
6910 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6911 {
6912         rbd_dev_unprobe(rbd_dev);
6913         if (rbd_dev->opts)
6914                 rbd_unregister_watch(rbd_dev);
6915         rbd_dev->image_format = 0;
6916         kfree(rbd_dev->spec->image_id);
6917         rbd_dev->spec->image_id = NULL;
6918 }
6919
6920 /*
6921  * Probe for the existence of the header object for the given rbd
6922  * device.  If this image is the one being mapped (i.e., not a
6923  * parent), initiate a watch on its header object before using that
6924  * object to get detailed information about the rbd image.
6925  */
6926 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6927 {
6928         bool need_watch = !rbd_is_ro(rbd_dev);
6929         int ret;
6930
6931         /*
6932          * Get the id from the image id object.  Unless there's an
6933          * error, rbd_dev->spec->image_id will be filled in with
6934          * a dynamically-allocated string, and rbd_dev->image_format
6935          * will be set to either 1 or 2.
6936          */
6937         ret = rbd_dev_image_id(rbd_dev);
6938         if (ret)
6939                 return ret;
6940
6941         ret = rbd_dev_header_name(rbd_dev);
6942         if (ret)
6943                 goto err_out_format;
6944
6945         if (need_watch) {
6946                 ret = rbd_register_watch(rbd_dev);
6947                 if (ret) {
6948                         if (ret == -ENOENT)
6949                                 rbd_print_dne(rbd_dev, false);
6950                         goto err_out_format;
6951                 }
6952         }
6953
6954         ret = rbd_dev_header_info(rbd_dev);
6955         if (ret) {
6956                 if (ret == -ENOENT && !need_watch)
6957                         rbd_print_dne(rbd_dev, false);
6958                 goto err_out_watch;
6959         }
6960
6961         /*
6962          * If this image is the one being mapped, we have pool name and
6963          * id, image name and id, and snap name - need to fill snap id.
6964          * Otherwise this is a parent image, identified by pool, image
6965          * and snap ids - need to fill in names for those ids.
6966          */
6967         if (!depth)
6968                 ret = rbd_spec_fill_snap_id(rbd_dev);
6969         else
6970                 ret = rbd_spec_fill_names(rbd_dev);
6971         if (ret) {
6972                 if (ret == -ENOENT)
6973                         rbd_print_dne(rbd_dev, true);
6974                 goto err_out_probe;
6975         }
6976
6977         ret = rbd_dev_mapping_set(rbd_dev);
6978         if (ret)
6979                 goto err_out_probe;
6980
6981         if (rbd_is_snap(rbd_dev) &&
6982             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
6983                 ret = rbd_object_map_load(rbd_dev);
6984                 if (ret)
6985                         goto err_out_probe;
6986         }
6987
6988         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6989                 ret = rbd_dev_v2_parent_info(rbd_dev);
6990                 if (ret)
6991                         goto err_out_probe;
6992         }
6993
6994         ret = rbd_dev_probe_parent(rbd_dev, depth);
6995         if (ret)
6996                 goto err_out_probe;
6997
6998         dout("discovered format %u image, header name is %s\n",
6999                 rbd_dev->image_format, rbd_dev->header_oid.name);
7000         return 0;
7001
7002 err_out_probe:
7003         rbd_dev_unprobe(rbd_dev);
7004 err_out_watch:
7005         if (need_watch)
7006                 rbd_unregister_watch(rbd_dev);
7007 err_out_format:
7008         rbd_dev->image_format = 0;
7009         kfree(rbd_dev->spec->image_id);
7010         rbd_dev->spec->image_id = NULL;
7011         return ret;
7012 }
7013
7014 static ssize_t do_rbd_add(struct bus_type *bus,
7015                           const char *buf,
7016                           size_t count)
7017 {
7018         struct rbd_device *rbd_dev = NULL;
7019         struct ceph_options *ceph_opts = NULL;
7020         struct rbd_options *rbd_opts = NULL;
7021         struct rbd_spec *spec = NULL;
7022         struct rbd_client *rbdc;
7023         int rc;
7024
7025         if (!try_module_get(THIS_MODULE))
7026                 return -ENODEV;
7027
7028         /* parse add command */
7029         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7030         if (rc < 0)
7031                 goto out;
7032
7033         rbdc = rbd_get_client(ceph_opts);
7034         if (IS_ERR(rbdc)) {
7035                 rc = PTR_ERR(rbdc);
7036                 goto err_out_args;
7037         }
7038
7039         /* pick the pool */
7040         rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7041         if (rc < 0) {
7042                 if (rc == -ENOENT)
7043                         pr_info("pool %s does not exist\n", spec->pool_name);
7044                 goto err_out_client;
7045         }
7046         spec->pool_id = (u64)rc;
7047
7048         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7049         if (!rbd_dev) {
7050                 rc = -ENOMEM;
7051                 goto err_out_client;
7052         }
7053         rbdc = NULL;            /* rbd_dev now owns this */
7054         spec = NULL;            /* rbd_dev now owns this */
7055         rbd_opts = NULL;        /* rbd_dev now owns this */
7056
7057         /* if we are mapping a snapshot it will be a read-only mapping */
7058         if (rbd_dev->opts->read_only ||
7059             strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7060                 __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7061
7062         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7063         if (!rbd_dev->config_info) {
7064                 rc = -ENOMEM;
7065                 goto err_out_rbd_dev;
7066         }
7067
7068         down_write(&rbd_dev->header_rwsem);
7069         rc = rbd_dev_image_probe(rbd_dev, 0);
7070         if (rc < 0) {
7071                 up_write(&rbd_dev->header_rwsem);
7072                 goto err_out_rbd_dev;
7073         }
7074
7075         if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7076                 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7077                          rbd_dev->layout.object_size);
7078                 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7079         }
7080
7081         rc = rbd_dev_device_setup(rbd_dev);
7082         if (rc)
7083                 goto err_out_image_probe;
7084
7085         rc = rbd_add_acquire_lock(rbd_dev);
7086         if (rc)
7087                 goto err_out_image_lock;
7088
7089         /* Everything's ready.  Announce the disk to the world. */
7090
7091         rc = device_add(&rbd_dev->dev);
7092         if (rc)
7093                 goto err_out_image_lock;
7094
7095         add_disk(rbd_dev->disk);
7096         /* see rbd_init_disk() */
7097         blk_put_queue(rbd_dev->disk->queue);
7098
7099         spin_lock(&rbd_dev_list_lock);
7100         list_add_tail(&rbd_dev->node, &rbd_dev_list);
7101         spin_unlock(&rbd_dev_list_lock);
7102
7103         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7104                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7105                 rbd_dev->header.features);
7106         rc = count;
7107 out:
7108         module_put(THIS_MODULE);
7109         return rc;
7110
7111 err_out_image_lock:
7112         rbd_dev_image_unlock(rbd_dev);
7113         rbd_dev_device_release(rbd_dev);
7114 err_out_image_probe:
7115         rbd_dev_image_release(rbd_dev);
7116 err_out_rbd_dev:
7117         rbd_dev_destroy(rbd_dev);
7118 err_out_client:
7119         rbd_put_client(rbdc);
7120 err_out_args:
7121         rbd_spec_put(spec);
7122         kfree(rbd_opts);
7123         goto out;
7124 }
7125
7126 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7127 {
7128         if (single_major)
7129                 return -EINVAL;
7130
7131         return do_rbd_add(bus, buf, count);
7132 }
7133
7134 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7135                                       size_t count)
7136 {
7137         return do_rbd_add(bus, buf, count);
7138 }
7139
7140 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7141 {
7142         while (rbd_dev->parent) {
7143                 struct rbd_device *first = rbd_dev;
7144                 struct rbd_device *second = first->parent;
7145                 struct rbd_device *third;
7146
7147                 /*
7148                  * Follow to the parent with no grandparent and
7149                  * remove it.
7150                  */
7151                 while (second && (third = second->parent)) {
7152                         first = second;
7153                         second = third;
7154                 }
7155                 rbd_assert(second);
7156                 rbd_dev_image_release(second);
7157                 rbd_dev_destroy(second);
7158                 first->parent = NULL;
7159                 first->parent_overlap = 0;
7160
7161                 rbd_assert(first->parent_spec);
7162                 rbd_spec_put(first->parent_spec);
7163                 first->parent_spec = NULL;
7164         }
7165 }
7166
7167 static ssize_t do_rbd_remove(struct bus_type *bus,
7168                              const char *buf,
7169                              size_t count)
7170 {
7171         struct rbd_device *rbd_dev = NULL;
7172         struct list_head *tmp;
7173         int dev_id;
7174         char opt_buf[6];
7175         bool force = false;
7176         int ret;
7177
7178         dev_id = -1;
7179         opt_buf[0] = '\0';
7180         sscanf(buf, "%d %5s", &dev_id, opt_buf);
7181         if (dev_id < 0) {
7182                 pr_err("dev_id out of range\n");
7183                 return -EINVAL;
7184         }
7185         if (opt_buf[0] != '\0') {
7186                 if (!strcmp(opt_buf, "force")) {
7187                         force = true;
7188                 } else {
7189                         pr_err("bad remove option at '%s'\n", opt_buf);
7190                         return -EINVAL;
7191                 }
7192         }
7193
7194         ret = -ENOENT;
7195         spin_lock(&rbd_dev_list_lock);
7196         list_for_each(tmp, &rbd_dev_list) {
7197                 rbd_dev = list_entry(tmp, struct rbd_device, node);
7198                 if (rbd_dev->dev_id == dev_id) {
7199                         ret = 0;
7200                         break;
7201                 }
7202         }
7203         if (!ret) {
7204                 spin_lock_irq(&rbd_dev->lock);
7205                 if (rbd_dev->open_count && !force)
7206                         ret = -EBUSY;
7207                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7208                                           &rbd_dev->flags))
7209                         ret = -EINPROGRESS;
7210                 spin_unlock_irq(&rbd_dev->lock);
7211         }
7212         spin_unlock(&rbd_dev_list_lock);
7213         if (ret)
7214                 return ret;
7215
7216         if (force) {
7217                 /*
7218                  * Prevent new IO from being queued and wait for existing
7219                  * IO to complete/fail.
7220                  */
7221                 blk_mq_freeze_queue(rbd_dev->disk->queue);
7222                 blk_set_queue_dying(rbd_dev->disk->queue);
7223         }
7224
7225         del_gendisk(rbd_dev->disk);
7226         spin_lock(&rbd_dev_list_lock);
7227         list_del_init(&rbd_dev->node);
7228         spin_unlock(&rbd_dev_list_lock);
7229         device_del(&rbd_dev->dev);
7230
7231         rbd_dev_image_unlock(rbd_dev);
7232         rbd_dev_device_release(rbd_dev);
7233         rbd_dev_image_release(rbd_dev);
7234         rbd_dev_destroy(rbd_dev);
7235         return count;
7236 }
7237
7238 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7239 {
7240         if (single_major)
7241                 return -EINVAL;
7242
7243         return do_rbd_remove(bus, buf, count);
7244 }
7245
7246 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7247                                          size_t count)
7248 {
7249         return do_rbd_remove(bus, buf, count);
7250 }
7251
7252 /*
7253  * create control files in sysfs
7254  * /sys/bus/rbd/...
7255  */
7256 static int __init rbd_sysfs_init(void)
7257 {
7258         int ret;
7259
7260         ret = device_register(&rbd_root_dev);
7261         if (ret < 0)
7262                 return ret;
7263
7264         ret = bus_register(&rbd_bus_type);
7265         if (ret < 0)
7266                 device_unregister(&rbd_root_dev);
7267
7268         return ret;
7269 }
7270
7271 static void __exit rbd_sysfs_cleanup(void)
7272 {
7273         bus_unregister(&rbd_bus_type);
7274         device_unregister(&rbd_root_dev);
7275 }
7276
7277 static int __init rbd_slab_init(void)
7278 {
7279         rbd_assert(!rbd_img_request_cache);
7280         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7281         if (!rbd_img_request_cache)
7282                 return -ENOMEM;
7283
7284         rbd_assert(!rbd_obj_request_cache);
7285         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7286         if (!rbd_obj_request_cache)
7287                 goto out_err;
7288
7289         return 0;
7290
7291 out_err:
7292         kmem_cache_destroy(rbd_img_request_cache);
7293         rbd_img_request_cache = NULL;
7294         return -ENOMEM;
7295 }
7296
7297 static void rbd_slab_exit(void)
7298 {
7299         rbd_assert(rbd_obj_request_cache);
7300         kmem_cache_destroy(rbd_obj_request_cache);
7301         rbd_obj_request_cache = NULL;
7302
7303         rbd_assert(rbd_img_request_cache);
7304         kmem_cache_destroy(rbd_img_request_cache);
7305         rbd_img_request_cache = NULL;
7306 }
7307
7308 static int __init rbd_init(void)
7309 {
7310         int rc;
7311
7312         if (!libceph_compatible(NULL)) {
7313                 rbd_warn(NULL, "libceph incompatibility (quitting)");
7314                 return -EINVAL;
7315         }
7316
7317         rc = rbd_slab_init();
7318         if (rc)
7319                 return rc;
7320
7321         /*
7322          * The number of active work items is limited by the number of
7323          * rbd devices * queue depth, so leave @max_active at default.
7324          */
7325         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7326         if (!rbd_wq) {
7327                 rc = -ENOMEM;
7328                 goto err_out_slab;
7329         }
7330
7331         if (single_major) {
7332                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
7333                 if (rbd_major < 0) {
7334                         rc = rbd_major;
7335                         goto err_out_wq;
7336                 }
7337         }
7338
7339         rc = rbd_sysfs_init();
7340         if (rc)
7341                 goto err_out_blkdev;
7342
7343         if (single_major)
7344                 pr_info("loaded (major %d)\n", rbd_major);
7345         else
7346                 pr_info("loaded\n");
7347
7348         return 0;
7349
7350 err_out_blkdev:
7351         if (single_major)
7352                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7353 err_out_wq:
7354         destroy_workqueue(rbd_wq);
7355 err_out_slab:
7356         rbd_slab_exit();
7357         return rc;
7358 }
7359
7360 static void __exit rbd_exit(void)
7361 {
7362         ida_destroy(&rbd_dev_id_ida);
7363         rbd_sysfs_cleanup();
7364         if (single_major)
7365                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7366         destroy_workqueue(rbd_wq);
7367         rbd_slab_exit();
7368 }
7369
7370 module_init(rbd_init);
7371 module_exit(rbd_exit);
7372
7373 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7374 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7375 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7376 /* following authorship retained from original osdblk.c */
7377 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7378
7379 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7380 MODULE_LICENSE("GPL");