Merge tag 'io_uring-5.6-2020-02-14' of git://git.kernel.dk/linux-block
[linux-2.6-microblaze.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/fs_parser.h>
38 #include <linux/bsearch.h>
39
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49
50 #include "rbd_types.h"
51
52 #define RBD_DEBUG       /* Activate rbd_assert() calls */
53
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62         unsigned int counter;
63
64         counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65         if (counter <= (unsigned int)INT_MAX)
66                 return (int)counter;
67
68         atomic_dec(v);
69
70         return -EINVAL;
71 }
72
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76         int counter;
77
78         counter = atomic_dec_return(v);
79         if (counter >= 0)
80                 return counter;
81
82         atomic_inc(v);
83
84         return -EINVAL;
85 }
86
87 #define RBD_DRV_NAME "rbd"
88
89 #define RBD_MINORS_PER_MAJOR            256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
91
92 #define RBD_MAX_PARENT_CHAIN_LEN        16
93
94 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN   \
96                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
99
100 #define RBD_SNAP_HEAD_NAME      "-"
101
102 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
103
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX    64
107
108 #define RBD_OBJ_PREFIX_LEN_MAX  64
109
110 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
111 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING            (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
118 #define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
119 #define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
120 #define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
121 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
122 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
123
124 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
125                                  RBD_FEATURE_STRIPINGV2 |       \
126                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
127                                  RBD_FEATURE_OBJECT_MAP |       \
128                                  RBD_FEATURE_FAST_DIFF |        \
129                                  RBD_FEATURE_DEEP_FLATTEN |     \
130                                  RBD_FEATURE_DATA_POOL |        \
131                                  RBD_FEATURE_OPERATIONS)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190         const char      *pool_ns;       /* NULL if default, never "" */
191
192         const char      *image_id;
193         const char      *image_name;
194
195         u64             snap_id;
196         const char      *snap_name;
197
198         struct kref     kref;
199 };
200
201 /*
202  * an instance of the client.  multiple devices may share an rbd client.
203  */
204 struct rbd_client {
205         struct ceph_client      *client;
206         struct kref             kref;
207         struct list_head        node;
208 };
209
210 struct pending_result {
211         int                     result;         /* first nonzero result */
212         int                     num_pending;
213 };
214
215 struct rbd_img_request;
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA = 1,
219         OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
220         OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
221         OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
222 };
223
224 enum obj_operation_type {
225         OBJ_OP_READ = 1,
226         OBJ_OP_WRITE,
227         OBJ_OP_DISCARD,
228         OBJ_OP_ZEROOUT,
229 };
230
231 #define RBD_OBJ_FLAG_DELETION                   (1U << 0)
232 #define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
233 #define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
234 #define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
236
237 enum rbd_obj_read_state {
238         RBD_OBJ_READ_START = 1,
239         RBD_OBJ_READ_OBJECT,
240         RBD_OBJ_READ_PARENT,
241 };
242
243 /*
244  * Writes go through the following state machine to deal with
245  * layering:
246  *
247  *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
248  *            .                 |                                    .
249  *            .                 v                                    .
250  *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
251  *            .                 |                    .               .
252  *            .                 v                    v (deep-copyup  .
253  *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
254  * flattened) v                 |                    .               .
255  *            .                 v                    .               .
256  *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
257  *                              |                        not needed) v
258  *                              v                                    .
259  *                            done . . . . . . . . . . . . . . . . . .
260  *                              ^
261  *                              |
262  *                     RBD_OBJ_WRITE_FLAT
263  *
264  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
265  * assert_exists guard is needed or not (in some cases it's not needed
266  * even if there is a parent).
267  */
268 enum rbd_obj_write_state {
269         RBD_OBJ_WRITE_START = 1,
270         RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271         RBD_OBJ_WRITE_OBJECT,
272         __RBD_OBJ_WRITE_COPYUP,
273         RBD_OBJ_WRITE_COPYUP,
274         RBD_OBJ_WRITE_POST_OBJECT_MAP,
275 };
276
277 enum rbd_obj_copyup_state {
278         RBD_OBJ_COPYUP_START = 1,
279         RBD_OBJ_COPYUP_READ_PARENT,
280         __RBD_OBJ_COPYUP_OBJECT_MAPS,
281         RBD_OBJ_COPYUP_OBJECT_MAPS,
282         __RBD_OBJ_COPYUP_WRITE_OBJECT,
283         RBD_OBJ_COPYUP_WRITE_OBJECT,
284 };
285
286 struct rbd_obj_request {
287         struct ceph_object_extent ex;
288         unsigned int            flags;  /* RBD_OBJ_FLAG_* */
289         union {
290                 enum rbd_obj_read_state  read_state;    /* for reads */
291                 enum rbd_obj_write_state write_state;   /* for writes */
292         };
293
294         struct rbd_img_request  *img_request;
295         struct ceph_file_extent *img_extents;
296         u32                     num_img_extents;
297
298         union {
299                 struct ceph_bio_iter    bio_pos;
300                 struct {
301                         struct ceph_bvec_iter   bvec_pos;
302                         u32                     bvec_count;
303                         u32                     bvec_idx;
304                 };
305         };
306
307         enum rbd_obj_copyup_state copyup_state;
308         struct bio_vec          *copyup_bvecs;
309         u32                     copyup_bvec_count;
310
311         struct list_head        osd_reqs;       /* w/ r_private_item */
312
313         struct mutex            state_mutex;
314         struct pending_result   pending;
315         struct kref             kref;
316 };
317
318 enum img_req_flags {
319         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
320         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
321 };
322
323 enum rbd_img_state {
324         RBD_IMG_START = 1,
325         RBD_IMG_EXCLUSIVE_LOCK,
326         __RBD_IMG_OBJECT_REQUESTS,
327         RBD_IMG_OBJECT_REQUESTS,
328 };
329
330 struct rbd_img_request {
331         struct rbd_device       *rbd_dev;
332         enum obj_operation_type op_type;
333         enum obj_request_type   data_type;
334         unsigned long           flags;
335         enum rbd_img_state      state;
336         union {
337                 u64                     snap_id;        /* for reads */
338                 struct ceph_snap_context *snapc;        /* for writes */
339         };
340         union {
341                 struct request          *rq;            /* block request */
342                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
343         };
344
345         struct list_head        lock_item;
346         struct list_head        object_extents; /* obj_req.ex structs */
347
348         struct mutex            state_mutex;
349         struct pending_result   pending;
350         struct work_struct      work;
351         int                     work_result;
352         struct kref             kref;
353 };
354
355 #define for_each_obj_request(ireq, oreq) \
356         list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
357 #define for_each_obj_request_safe(ireq, oreq, n) \
358         list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
359
360 enum rbd_watch_state {
361         RBD_WATCH_STATE_UNREGISTERED,
362         RBD_WATCH_STATE_REGISTERED,
363         RBD_WATCH_STATE_ERROR,
364 };
365
366 enum rbd_lock_state {
367         RBD_LOCK_STATE_UNLOCKED,
368         RBD_LOCK_STATE_LOCKED,
369         RBD_LOCK_STATE_RELEASING,
370 };
371
372 /* WatchNotify::ClientId */
373 struct rbd_client_id {
374         u64 gid;
375         u64 handle;
376 };
377
378 struct rbd_mapping {
379         u64                     size;
380 };
381
382 /*
383  * a single device
384  */
385 struct rbd_device {
386         int                     dev_id;         /* blkdev unique id */
387
388         int                     major;          /* blkdev assigned major */
389         int                     minor;
390         struct gendisk          *disk;          /* blkdev's gendisk and rq */
391
392         u32                     image_format;   /* Either 1 or 2 */
393         struct rbd_client       *rbd_client;
394
395         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
396
397         spinlock_t              lock;           /* queue, flags, open_count */
398
399         struct rbd_image_header header;
400         unsigned long           flags;          /* possibly lock protected */
401         struct rbd_spec         *spec;
402         struct rbd_options      *opts;
403         char                    *config_info;   /* add{,_single_major} string */
404
405         struct ceph_object_id   header_oid;
406         struct ceph_object_locator header_oloc;
407
408         struct ceph_file_layout layout;         /* used for all rbd requests */
409
410         struct mutex            watch_mutex;
411         enum rbd_watch_state    watch_state;
412         struct ceph_osd_linger_request *watch_handle;
413         u64                     watch_cookie;
414         struct delayed_work     watch_dwork;
415
416         struct rw_semaphore     lock_rwsem;
417         enum rbd_lock_state     lock_state;
418         char                    lock_cookie[32];
419         struct rbd_client_id    owner_cid;
420         struct work_struct      acquired_lock_work;
421         struct work_struct      released_lock_work;
422         struct delayed_work     lock_dwork;
423         struct work_struct      unlock_work;
424         spinlock_t              lock_lists_lock;
425         struct list_head        acquiring_list;
426         struct list_head        running_list;
427         struct completion       acquire_wait;
428         int                     acquire_err;
429         struct completion       releasing_wait;
430
431         spinlock_t              object_map_lock;
432         u8                      *object_map;
433         u64                     object_map_size;        /* in objects */
434         u64                     object_map_flags;
435
436         struct workqueue_struct *task_wq;
437
438         struct rbd_spec         *parent_spec;
439         u64                     parent_overlap;
440         atomic_t                parent_ref;
441         struct rbd_device       *parent;
442
443         /* Block layer tags. */
444         struct blk_mq_tag_set   tag_set;
445
446         /* protects updating the header */
447         struct rw_semaphore     header_rwsem;
448
449         struct rbd_mapping      mapping;
450
451         struct list_head        node;
452
453         /* sysfs related */
454         struct device           dev;
455         unsigned long           open_count;     /* protected by lock */
456 };
457
458 /*
459  * Flag bits for rbd_dev->flags:
460  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
461  *   by rbd_dev->lock
462  */
463 enum rbd_dev_flags {
464         RBD_DEV_FLAG_EXISTS,    /* rbd_dev_device_setup() ran */
465         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
466         RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
467 };
468
469 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
470
471 static LIST_HEAD(rbd_dev_list);    /* devices */
472 static DEFINE_SPINLOCK(rbd_dev_list_lock);
473
474 static LIST_HEAD(rbd_client_list);              /* clients */
475 static DEFINE_SPINLOCK(rbd_client_list_lock);
476
477 /* Slab caches for frequently-allocated structures */
478
479 static struct kmem_cache        *rbd_img_request_cache;
480 static struct kmem_cache        *rbd_obj_request_cache;
481
482 static int rbd_major;
483 static DEFINE_IDA(rbd_dev_id_ida);
484
485 static struct workqueue_struct *rbd_wq;
486
487 static struct ceph_snap_context rbd_empty_snapc = {
488         .nref = REFCOUNT_INIT(1),
489 };
490
491 /*
492  * single-major requires >= 0.75 version of userspace rbd utility.
493  */
494 static bool single_major = true;
495 module_param(single_major, bool, 0444);
496 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
497
498 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
499 static ssize_t remove_store(struct bus_type *bus, const char *buf,
500                             size_t count);
501 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
502                                       size_t count);
503 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
504                                          size_t count);
505 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
506
507 static int rbd_dev_id_to_minor(int dev_id)
508 {
509         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
510 }
511
512 static int minor_to_rbd_dev_id(int minor)
513 {
514         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
515 }
516
517 static bool rbd_is_ro(struct rbd_device *rbd_dev)
518 {
519         return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
520 }
521
522 static bool rbd_is_snap(struct rbd_device *rbd_dev)
523 {
524         return rbd_dev->spec->snap_id != CEPH_NOSNAP;
525 }
526
527 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
528 {
529         lockdep_assert_held(&rbd_dev->lock_rwsem);
530
531         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
532                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
533 }
534
535 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
536 {
537         bool is_lock_owner;
538
539         down_read(&rbd_dev->lock_rwsem);
540         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
541         up_read(&rbd_dev->lock_rwsem);
542         return is_lock_owner;
543 }
544
545 static ssize_t supported_features_show(struct bus_type *bus, char *buf)
546 {
547         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
548 }
549
550 static BUS_ATTR_WO(add);
551 static BUS_ATTR_WO(remove);
552 static BUS_ATTR_WO(add_single_major);
553 static BUS_ATTR_WO(remove_single_major);
554 static BUS_ATTR_RO(supported_features);
555
556 static struct attribute *rbd_bus_attrs[] = {
557         &bus_attr_add.attr,
558         &bus_attr_remove.attr,
559         &bus_attr_add_single_major.attr,
560         &bus_attr_remove_single_major.attr,
561         &bus_attr_supported_features.attr,
562         NULL,
563 };
564
565 static umode_t rbd_bus_is_visible(struct kobject *kobj,
566                                   struct attribute *attr, int index)
567 {
568         if (!single_major &&
569             (attr == &bus_attr_add_single_major.attr ||
570              attr == &bus_attr_remove_single_major.attr))
571                 return 0;
572
573         return attr->mode;
574 }
575
576 static const struct attribute_group rbd_bus_group = {
577         .attrs = rbd_bus_attrs,
578         .is_visible = rbd_bus_is_visible,
579 };
580 __ATTRIBUTE_GROUPS(rbd_bus);
581
582 static struct bus_type rbd_bus_type = {
583         .name           = "rbd",
584         .bus_groups     = rbd_bus_groups,
585 };
586
587 static void rbd_root_dev_release(struct device *dev)
588 {
589 }
590
591 static struct device rbd_root_dev = {
592         .init_name =    "rbd",
593         .release =      rbd_root_dev_release,
594 };
595
596 static __printf(2, 3)
597 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
598 {
599         struct va_format vaf;
600         va_list args;
601
602         va_start(args, fmt);
603         vaf.fmt = fmt;
604         vaf.va = &args;
605
606         if (!rbd_dev)
607                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
608         else if (rbd_dev->disk)
609                 printk(KERN_WARNING "%s: %s: %pV\n",
610                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
611         else if (rbd_dev->spec && rbd_dev->spec->image_name)
612                 printk(KERN_WARNING "%s: image %s: %pV\n",
613                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
614         else if (rbd_dev->spec && rbd_dev->spec->image_id)
615                 printk(KERN_WARNING "%s: id %s: %pV\n",
616                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
617         else    /* punt */
618                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
619                         RBD_DRV_NAME, rbd_dev, &vaf);
620         va_end(args);
621 }
622
623 #ifdef RBD_DEBUG
624 #define rbd_assert(expr)                                                \
625                 if (unlikely(!(expr))) {                                \
626                         printk(KERN_ERR "\nAssertion failure in %s() "  \
627                                                 "at line %d:\n\n"       \
628                                         "\trbd_assert(%s);\n\n",        \
629                                         __func__, __LINE__, #expr);     \
630                         BUG();                                          \
631                 }
632 #else /* !RBD_DEBUG */
633 #  define rbd_assert(expr)      ((void) 0)
634 #endif /* !RBD_DEBUG */
635
636 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
637
638 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
639 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
640 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
641 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
642 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
643                                         u64 snap_id);
644 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
645                                 u8 *order, u64 *snap_size);
646 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
647
648 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
649 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
650
651 /*
652  * Return true if nothing else is pending.
653  */
654 static bool pending_result_dec(struct pending_result *pending, int *result)
655 {
656         rbd_assert(pending->num_pending > 0);
657
658         if (*result && !pending->result)
659                 pending->result = *result;
660         if (--pending->num_pending)
661                 return false;
662
663         *result = pending->result;
664         return true;
665 }
666
667 static int rbd_open(struct block_device *bdev, fmode_t mode)
668 {
669         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
670         bool removing = false;
671
672         spin_lock_irq(&rbd_dev->lock);
673         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
674                 removing = true;
675         else
676                 rbd_dev->open_count++;
677         spin_unlock_irq(&rbd_dev->lock);
678         if (removing)
679                 return -ENOENT;
680
681         (void) get_device(&rbd_dev->dev);
682
683         return 0;
684 }
685
686 static void rbd_release(struct gendisk *disk, fmode_t mode)
687 {
688         struct rbd_device *rbd_dev = disk->private_data;
689         unsigned long open_count_before;
690
691         spin_lock_irq(&rbd_dev->lock);
692         open_count_before = rbd_dev->open_count--;
693         spin_unlock_irq(&rbd_dev->lock);
694         rbd_assert(open_count_before > 0);
695
696         put_device(&rbd_dev->dev);
697 }
698
699 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
700 {
701         int ro;
702
703         if (get_user(ro, (int __user *)arg))
704                 return -EFAULT;
705
706         /*
707          * Both images mapped read-only and snapshots can't be marked
708          * read-write.
709          */
710         if (!ro) {
711                 if (rbd_is_ro(rbd_dev))
712                         return -EROFS;
713
714                 rbd_assert(!rbd_is_snap(rbd_dev));
715         }
716
717         /* Let blkdev_roset() handle it */
718         return -ENOTTY;
719 }
720
721 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
722                         unsigned int cmd, unsigned long arg)
723 {
724         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
725         int ret;
726
727         switch (cmd) {
728         case BLKROSET:
729                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
730                 break;
731         default:
732                 ret = -ENOTTY;
733         }
734
735         return ret;
736 }
737
738 #ifdef CONFIG_COMPAT
739 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
740                                 unsigned int cmd, unsigned long arg)
741 {
742         return rbd_ioctl(bdev, mode, cmd, arg);
743 }
744 #endif /* CONFIG_COMPAT */
745
746 static const struct block_device_operations rbd_bd_ops = {
747         .owner                  = THIS_MODULE,
748         .open                   = rbd_open,
749         .release                = rbd_release,
750         .ioctl                  = rbd_ioctl,
751 #ifdef CONFIG_COMPAT
752         .compat_ioctl           = rbd_compat_ioctl,
753 #endif
754 };
755
756 /*
757  * Initialize an rbd client instance.  Success or not, this function
758  * consumes ceph_opts.  Caller holds client_mutex.
759  */
760 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
761 {
762         struct rbd_client *rbdc;
763         int ret = -ENOMEM;
764
765         dout("%s:\n", __func__);
766         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
767         if (!rbdc)
768                 goto out_opt;
769
770         kref_init(&rbdc->kref);
771         INIT_LIST_HEAD(&rbdc->node);
772
773         rbdc->client = ceph_create_client(ceph_opts, rbdc);
774         if (IS_ERR(rbdc->client))
775                 goto out_rbdc;
776         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
777
778         ret = ceph_open_session(rbdc->client);
779         if (ret < 0)
780                 goto out_client;
781
782         spin_lock(&rbd_client_list_lock);
783         list_add_tail(&rbdc->node, &rbd_client_list);
784         spin_unlock(&rbd_client_list_lock);
785
786         dout("%s: rbdc %p\n", __func__, rbdc);
787
788         return rbdc;
789 out_client:
790         ceph_destroy_client(rbdc->client);
791 out_rbdc:
792         kfree(rbdc);
793 out_opt:
794         if (ceph_opts)
795                 ceph_destroy_options(ceph_opts);
796         dout("%s: error %d\n", __func__, ret);
797
798         return ERR_PTR(ret);
799 }
800
801 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
802 {
803         kref_get(&rbdc->kref);
804
805         return rbdc;
806 }
807
808 /*
809  * Find a ceph client with specific addr and configuration.  If
810  * found, bump its reference count.
811  */
812 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
813 {
814         struct rbd_client *client_node;
815         bool found = false;
816
817         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
818                 return NULL;
819
820         spin_lock(&rbd_client_list_lock);
821         list_for_each_entry(client_node, &rbd_client_list, node) {
822                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
823                         __rbd_get_client(client_node);
824
825                         found = true;
826                         break;
827                 }
828         }
829         spin_unlock(&rbd_client_list_lock);
830
831         return found ? client_node : NULL;
832 }
833
834 /*
835  * (Per device) rbd map options
836  */
837 enum {
838         Opt_queue_depth,
839         Opt_alloc_size,
840         Opt_lock_timeout,
841         /* int args above */
842         Opt_pool_ns,
843         /* string args above */
844         Opt_read_only,
845         Opt_read_write,
846         Opt_lock_on_read,
847         Opt_exclusive,
848         Opt_notrim,
849 };
850
851 static const struct fs_parameter_spec rbd_parameters[] = {
852         fsparam_u32     ("alloc_size",                  Opt_alloc_size),
853         fsparam_flag    ("exclusive",                   Opt_exclusive),
854         fsparam_flag    ("lock_on_read",                Opt_lock_on_read),
855         fsparam_u32     ("lock_timeout",                Opt_lock_timeout),
856         fsparam_flag    ("notrim",                      Opt_notrim),
857         fsparam_string  ("_pool_ns",                    Opt_pool_ns),
858         fsparam_u32     ("queue_depth",                 Opt_queue_depth),
859         fsparam_flag    ("read_only",                   Opt_read_only),
860         fsparam_flag    ("read_write",                  Opt_read_write),
861         fsparam_flag    ("ro",                          Opt_read_only),
862         fsparam_flag    ("rw",                          Opt_read_write),
863         {}
864 };
865
866 struct rbd_options {
867         int     queue_depth;
868         int     alloc_size;
869         unsigned long   lock_timeout;
870         bool    read_only;
871         bool    lock_on_read;
872         bool    exclusive;
873         bool    trim;
874 };
875
876 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
877 #define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
878 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
879 #define RBD_READ_ONLY_DEFAULT   false
880 #define RBD_LOCK_ON_READ_DEFAULT false
881 #define RBD_EXCLUSIVE_DEFAULT   false
882 #define RBD_TRIM_DEFAULT        true
883
884 struct rbd_parse_opts_ctx {
885         struct rbd_spec         *spec;
886         struct ceph_options     *copts;
887         struct rbd_options      *opts;
888 };
889
890 static char* obj_op_name(enum obj_operation_type op_type)
891 {
892         switch (op_type) {
893         case OBJ_OP_READ:
894                 return "read";
895         case OBJ_OP_WRITE:
896                 return "write";
897         case OBJ_OP_DISCARD:
898                 return "discard";
899         case OBJ_OP_ZEROOUT:
900                 return "zeroout";
901         default:
902                 return "???";
903         }
904 }
905
906 /*
907  * Destroy ceph client
908  *
909  * Caller must hold rbd_client_list_lock.
910  */
911 static void rbd_client_release(struct kref *kref)
912 {
913         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
914
915         dout("%s: rbdc %p\n", __func__, rbdc);
916         spin_lock(&rbd_client_list_lock);
917         list_del(&rbdc->node);
918         spin_unlock(&rbd_client_list_lock);
919
920         ceph_destroy_client(rbdc->client);
921         kfree(rbdc);
922 }
923
924 /*
925  * Drop reference to ceph client node. If it's not referenced anymore, release
926  * it.
927  */
928 static void rbd_put_client(struct rbd_client *rbdc)
929 {
930         if (rbdc)
931                 kref_put(&rbdc->kref, rbd_client_release);
932 }
933
934 /*
935  * Get a ceph client with specific addr and configuration, if one does
936  * not exist create it.  Either way, ceph_opts is consumed by this
937  * function.
938  */
939 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
940 {
941         struct rbd_client *rbdc;
942         int ret;
943
944         mutex_lock(&client_mutex);
945         rbdc = rbd_client_find(ceph_opts);
946         if (rbdc) {
947                 ceph_destroy_options(ceph_opts);
948
949                 /*
950                  * Using an existing client.  Make sure ->pg_pools is up to
951                  * date before we look up the pool id in do_rbd_add().
952                  */
953                 ret = ceph_wait_for_latest_osdmap(rbdc->client,
954                                         rbdc->client->options->mount_timeout);
955                 if (ret) {
956                         rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
957                         rbd_put_client(rbdc);
958                         rbdc = ERR_PTR(ret);
959                 }
960         } else {
961                 rbdc = rbd_client_create(ceph_opts);
962         }
963         mutex_unlock(&client_mutex);
964
965         return rbdc;
966 }
967
968 static bool rbd_image_format_valid(u32 image_format)
969 {
970         return image_format == 1 || image_format == 2;
971 }
972
973 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
974 {
975         size_t size;
976         u32 snap_count;
977
978         /* The header has to start with the magic rbd header text */
979         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
980                 return false;
981
982         /* The bio layer requires at least sector-sized I/O */
983
984         if (ondisk->options.order < SECTOR_SHIFT)
985                 return false;
986
987         /* If we use u64 in a few spots we may be able to loosen this */
988
989         if (ondisk->options.order > 8 * sizeof (int) - 1)
990                 return false;
991
992         /*
993          * The size of a snapshot header has to fit in a size_t, and
994          * that limits the number of snapshots.
995          */
996         snap_count = le32_to_cpu(ondisk->snap_count);
997         size = SIZE_MAX - sizeof (struct ceph_snap_context);
998         if (snap_count > size / sizeof (__le64))
999                 return false;
1000
1001         /*
1002          * Not only that, but the size of the entire the snapshot
1003          * header must also be representable in a size_t.
1004          */
1005         size -= snap_count * sizeof (__le64);
1006         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
1007                 return false;
1008
1009         return true;
1010 }
1011
1012 /*
1013  * returns the size of an object in the image
1014  */
1015 static u32 rbd_obj_bytes(struct rbd_image_header *header)
1016 {
1017         return 1U << header->obj_order;
1018 }
1019
1020 static void rbd_init_layout(struct rbd_device *rbd_dev)
1021 {
1022         if (rbd_dev->header.stripe_unit == 0 ||
1023             rbd_dev->header.stripe_count == 0) {
1024                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1025                 rbd_dev->header.stripe_count = 1;
1026         }
1027
1028         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1029         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1030         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1031         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1032                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1033         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1034 }
1035
1036 /*
1037  * Fill an rbd image header with information from the given format 1
1038  * on-disk header.
1039  */
1040 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1041                                  struct rbd_image_header_ondisk *ondisk)
1042 {
1043         struct rbd_image_header *header = &rbd_dev->header;
1044         bool first_time = header->object_prefix == NULL;
1045         struct ceph_snap_context *snapc;
1046         char *object_prefix = NULL;
1047         char *snap_names = NULL;
1048         u64 *snap_sizes = NULL;
1049         u32 snap_count;
1050         int ret = -ENOMEM;
1051         u32 i;
1052
1053         /* Allocate this now to avoid having to handle failure below */
1054
1055         if (first_time) {
1056                 object_prefix = kstrndup(ondisk->object_prefix,
1057                                          sizeof(ondisk->object_prefix),
1058                                          GFP_KERNEL);
1059                 if (!object_prefix)
1060                         return -ENOMEM;
1061         }
1062
1063         /* Allocate the snapshot context and fill it in */
1064
1065         snap_count = le32_to_cpu(ondisk->snap_count);
1066         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1067         if (!snapc)
1068                 goto out_err;
1069         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1070         if (snap_count) {
1071                 struct rbd_image_snap_ondisk *snaps;
1072                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1073
1074                 /* We'll keep a copy of the snapshot names... */
1075
1076                 if (snap_names_len > (u64)SIZE_MAX)
1077                         goto out_2big;
1078                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1079                 if (!snap_names)
1080                         goto out_err;
1081
1082                 /* ...as well as the array of their sizes. */
1083                 snap_sizes = kmalloc_array(snap_count,
1084                                            sizeof(*header->snap_sizes),
1085                                            GFP_KERNEL);
1086                 if (!snap_sizes)
1087                         goto out_err;
1088
1089                 /*
1090                  * Copy the names, and fill in each snapshot's id
1091                  * and size.
1092                  *
1093                  * Note that rbd_dev_v1_header_info() guarantees the
1094                  * ondisk buffer we're working with has
1095                  * snap_names_len bytes beyond the end of the
1096                  * snapshot id array, this memcpy() is safe.
1097                  */
1098                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1099                 snaps = ondisk->snaps;
1100                 for (i = 0; i < snap_count; i++) {
1101                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1102                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1103                 }
1104         }
1105
1106         /* We won't fail any more, fill in the header */
1107
1108         if (first_time) {
1109                 header->object_prefix = object_prefix;
1110                 header->obj_order = ondisk->options.order;
1111                 rbd_init_layout(rbd_dev);
1112         } else {
1113                 ceph_put_snap_context(header->snapc);
1114                 kfree(header->snap_names);
1115                 kfree(header->snap_sizes);
1116         }
1117
1118         /* The remaining fields always get updated (when we refresh) */
1119
1120         header->image_size = le64_to_cpu(ondisk->image_size);
1121         header->snapc = snapc;
1122         header->snap_names = snap_names;
1123         header->snap_sizes = snap_sizes;
1124
1125         return 0;
1126 out_2big:
1127         ret = -EIO;
1128 out_err:
1129         kfree(snap_sizes);
1130         kfree(snap_names);
1131         ceph_put_snap_context(snapc);
1132         kfree(object_prefix);
1133
1134         return ret;
1135 }
1136
1137 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1138 {
1139         const char *snap_name;
1140
1141         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1142
1143         /* Skip over names until we find the one we are looking for */
1144
1145         snap_name = rbd_dev->header.snap_names;
1146         while (which--)
1147                 snap_name += strlen(snap_name) + 1;
1148
1149         return kstrdup(snap_name, GFP_KERNEL);
1150 }
1151
1152 /*
1153  * Snapshot id comparison function for use with qsort()/bsearch().
1154  * Note that result is for snapshots in *descending* order.
1155  */
1156 static int snapid_compare_reverse(const void *s1, const void *s2)
1157 {
1158         u64 snap_id1 = *(u64 *)s1;
1159         u64 snap_id2 = *(u64 *)s2;
1160
1161         if (snap_id1 < snap_id2)
1162                 return 1;
1163         return snap_id1 == snap_id2 ? 0 : -1;
1164 }
1165
1166 /*
1167  * Search a snapshot context to see if the given snapshot id is
1168  * present.
1169  *
1170  * Returns the position of the snapshot id in the array if it's found,
1171  * or BAD_SNAP_INDEX otherwise.
1172  *
1173  * Note: The snapshot array is in kept sorted (by the osd) in
1174  * reverse order, highest snapshot id first.
1175  */
1176 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1177 {
1178         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1179         u64 *found;
1180
1181         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1182                                 sizeof (snap_id), snapid_compare_reverse);
1183
1184         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1185 }
1186
1187 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1188                                         u64 snap_id)
1189 {
1190         u32 which;
1191         const char *snap_name;
1192
1193         which = rbd_dev_snap_index(rbd_dev, snap_id);
1194         if (which == BAD_SNAP_INDEX)
1195                 return ERR_PTR(-ENOENT);
1196
1197         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1198         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1199 }
1200
1201 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1202 {
1203         if (snap_id == CEPH_NOSNAP)
1204                 return RBD_SNAP_HEAD_NAME;
1205
1206         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1207         if (rbd_dev->image_format == 1)
1208                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1209
1210         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1211 }
1212
1213 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1214                                 u64 *snap_size)
1215 {
1216         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1217         if (snap_id == CEPH_NOSNAP) {
1218                 *snap_size = rbd_dev->header.image_size;
1219         } else if (rbd_dev->image_format == 1) {
1220                 u32 which;
1221
1222                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1223                 if (which == BAD_SNAP_INDEX)
1224                         return -ENOENT;
1225
1226                 *snap_size = rbd_dev->header.snap_sizes[which];
1227         } else {
1228                 u64 size = 0;
1229                 int ret;
1230
1231                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1232                 if (ret)
1233                         return ret;
1234
1235                 *snap_size = size;
1236         }
1237         return 0;
1238 }
1239
1240 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1241 {
1242         u64 snap_id = rbd_dev->spec->snap_id;
1243         u64 size = 0;
1244         int ret;
1245
1246         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1247         if (ret)
1248                 return ret;
1249
1250         rbd_dev->mapping.size = size;
1251         return 0;
1252 }
1253
1254 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1255 {
1256         rbd_dev->mapping.size = 0;
1257 }
1258
1259 static void zero_bvec(struct bio_vec *bv)
1260 {
1261         void *buf;
1262         unsigned long flags;
1263
1264         buf = bvec_kmap_irq(bv, &flags);
1265         memset(buf, 0, bv->bv_len);
1266         flush_dcache_page(bv->bv_page);
1267         bvec_kunmap_irq(buf, &flags);
1268 }
1269
1270 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1271 {
1272         struct ceph_bio_iter it = *bio_pos;
1273
1274         ceph_bio_iter_advance(&it, off);
1275         ceph_bio_iter_advance_step(&it, bytes, ({
1276                 zero_bvec(&bv);
1277         }));
1278 }
1279
1280 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1281 {
1282         struct ceph_bvec_iter it = *bvec_pos;
1283
1284         ceph_bvec_iter_advance(&it, off);
1285         ceph_bvec_iter_advance_step(&it, bytes, ({
1286                 zero_bvec(&bv);
1287         }));
1288 }
1289
1290 /*
1291  * Zero a range in @obj_req data buffer defined by a bio (list) or
1292  * (private) bio_vec array.
1293  *
1294  * @off is relative to the start of the data buffer.
1295  */
1296 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1297                                u32 bytes)
1298 {
1299         dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1300
1301         switch (obj_req->img_request->data_type) {
1302         case OBJ_REQUEST_BIO:
1303                 zero_bios(&obj_req->bio_pos, off, bytes);
1304                 break;
1305         case OBJ_REQUEST_BVECS:
1306         case OBJ_REQUEST_OWN_BVECS:
1307                 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1308                 break;
1309         default:
1310                 BUG();
1311         }
1312 }
1313
1314 static void rbd_obj_request_destroy(struct kref *kref);
1315 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1316 {
1317         rbd_assert(obj_request != NULL);
1318         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1319                 kref_read(&obj_request->kref));
1320         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1321 }
1322
1323 static void rbd_img_request_destroy(struct kref *kref);
1324 static void rbd_img_request_put(struct rbd_img_request *img_request)
1325 {
1326         rbd_assert(img_request != NULL);
1327         dout("%s: img %p (was %d)\n", __func__, img_request,
1328                 kref_read(&img_request->kref));
1329         kref_put(&img_request->kref, rbd_img_request_destroy);
1330 }
1331
1332 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1333                                         struct rbd_obj_request *obj_request)
1334 {
1335         rbd_assert(obj_request->img_request == NULL);
1336
1337         /* Image request now owns object's original reference */
1338         obj_request->img_request = img_request;
1339         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1340 }
1341
1342 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1343                                         struct rbd_obj_request *obj_request)
1344 {
1345         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1346         list_del(&obj_request->ex.oe_item);
1347         rbd_assert(obj_request->img_request == img_request);
1348         rbd_obj_request_put(obj_request);
1349 }
1350
1351 static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1352 {
1353         struct rbd_obj_request *obj_req = osd_req->r_priv;
1354
1355         dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1356              __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1357              obj_req->ex.oe_off, obj_req->ex.oe_len);
1358         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1359 }
1360
1361 /*
1362  * The default/initial value for all image request flags is 0.  Each
1363  * is conditionally set to 1 at image request initialization time
1364  * and currently never change thereafter.
1365  */
1366 static void img_request_layered_set(struct rbd_img_request *img_request)
1367 {
1368         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1369         smp_mb();
1370 }
1371
1372 static void img_request_layered_clear(struct rbd_img_request *img_request)
1373 {
1374         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1375         smp_mb();
1376 }
1377
1378 static bool img_request_layered_test(struct rbd_img_request *img_request)
1379 {
1380         smp_mb();
1381         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1382 }
1383
1384 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1385 {
1386         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1387
1388         return !obj_req->ex.oe_off &&
1389                obj_req->ex.oe_len == rbd_dev->layout.object_size;
1390 }
1391
1392 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1393 {
1394         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1395
1396         return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1397                                         rbd_dev->layout.object_size;
1398 }
1399
1400 /*
1401  * Must be called after rbd_obj_calc_img_extents().
1402  */
1403 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1404 {
1405         if (!obj_req->num_img_extents ||
1406             (rbd_obj_is_entire(obj_req) &&
1407              !obj_req->img_request->snapc->num_snaps))
1408                 return false;
1409
1410         return true;
1411 }
1412
1413 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1414 {
1415         return ceph_file_extents_bytes(obj_req->img_extents,
1416                                        obj_req->num_img_extents);
1417 }
1418
1419 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1420 {
1421         switch (img_req->op_type) {
1422         case OBJ_OP_READ:
1423                 return false;
1424         case OBJ_OP_WRITE:
1425         case OBJ_OP_DISCARD:
1426         case OBJ_OP_ZEROOUT:
1427                 return true;
1428         default:
1429                 BUG();
1430         }
1431 }
1432
1433 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1434 {
1435         struct rbd_obj_request *obj_req = osd_req->r_priv;
1436         int result;
1437
1438         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1439              osd_req->r_result, obj_req);
1440
1441         /*
1442          * Writes aren't allowed to return a data payload.  In some
1443          * guarded write cases (e.g. stat + zero on an empty object)
1444          * a stat response makes it through, but we don't care.
1445          */
1446         if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1447                 result = 0;
1448         else
1449                 result = osd_req->r_result;
1450
1451         rbd_obj_handle_request(obj_req, result);
1452 }
1453
1454 static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1455 {
1456         struct rbd_obj_request *obj_request = osd_req->r_priv;
1457
1458         osd_req->r_flags = CEPH_OSD_FLAG_READ;
1459         osd_req->r_snapid = obj_request->img_request->snap_id;
1460 }
1461
1462 static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1463 {
1464         struct rbd_obj_request *obj_request = osd_req->r_priv;
1465
1466         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1467         ktime_get_real_ts64(&osd_req->r_mtime);
1468         osd_req->r_data_offset = obj_request->ex.oe_off;
1469 }
1470
1471 static struct ceph_osd_request *
1472 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1473                           struct ceph_snap_context *snapc, int num_ops)
1474 {
1475         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1476         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1477         struct ceph_osd_request *req;
1478         const char *name_format = rbd_dev->image_format == 1 ?
1479                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1480         int ret;
1481
1482         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1483         if (!req)
1484                 return ERR_PTR(-ENOMEM);
1485
1486         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1487         req->r_callback = rbd_osd_req_callback;
1488         req->r_priv = obj_req;
1489
1490         /*
1491          * Data objects may be stored in a separate pool, but always in
1492          * the same namespace in that pool as the header in its pool.
1493          */
1494         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1495         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1496
1497         ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1498                                rbd_dev->header.object_prefix,
1499                                obj_req->ex.oe_objno);
1500         if (ret)
1501                 return ERR_PTR(ret);
1502
1503         return req;
1504 }
1505
1506 static struct ceph_osd_request *
1507 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1508 {
1509         return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1510                                          num_ops);
1511 }
1512
1513 static struct rbd_obj_request *rbd_obj_request_create(void)
1514 {
1515         struct rbd_obj_request *obj_request;
1516
1517         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1518         if (!obj_request)
1519                 return NULL;
1520
1521         ceph_object_extent_init(&obj_request->ex);
1522         INIT_LIST_HEAD(&obj_request->osd_reqs);
1523         mutex_init(&obj_request->state_mutex);
1524         kref_init(&obj_request->kref);
1525
1526         dout("%s %p\n", __func__, obj_request);
1527         return obj_request;
1528 }
1529
1530 static void rbd_obj_request_destroy(struct kref *kref)
1531 {
1532         struct rbd_obj_request *obj_request;
1533         struct ceph_osd_request *osd_req;
1534         u32 i;
1535
1536         obj_request = container_of(kref, struct rbd_obj_request, kref);
1537
1538         dout("%s: obj %p\n", __func__, obj_request);
1539
1540         while (!list_empty(&obj_request->osd_reqs)) {
1541                 osd_req = list_first_entry(&obj_request->osd_reqs,
1542                                     struct ceph_osd_request, r_private_item);
1543                 list_del_init(&osd_req->r_private_item);
1544                 ceph_osdc_put_request(osd_req);
1545         }
1546
1547         switch (obj_request->img_request->data_type) {
1548         case OBJ_REQUEST_NODATA:
1549         case OBJ_REQUEST_BIO:
1550         case OBJ_REQUEST_BVECS:
1551                 break;          /* Nothing to do */
1552         case OBJ_REQUEST_OWN_BVECS:
1553                 kfree(obj_request->bvec_pos.bvecs);
1554                 break;
1555         default:
1556                 BUG();
1557         }
1558
1559         kfree(obj_request->img_extents);
1560         if (obj_request->copyup_bvecs) {
1561                 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1562                         if (obj_request->copyup_bvecs[i].bv_page)
1563                                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1564                 }
1565                 kfree(obj_request->copyup_bvecs);
1566         }
1567
1568         kmem_cache_free(rbd_obj_request_cache, obj_request);
1569 }
1570
1571 /* It's OK to call this for a device with no parent */
1572
1573 static void rbd_spec_put(struct rbd_spec *spec);
1574 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1575 {
1576         rbd_dev_remove_parent(rbd_dev);
1577         rbd_spec_put(rbd_dev->parent_spec);
1578         rbd_dev->parent_spec = NULL;
1579         rbd_dev->parent_overlap = 0;
1580 }
1581
1582 /*
1583  * Parent image reference counting is used to determine when an
1584  * image's parent fields can be safely torn down--after there are no
1585  * more in-flight requests to the parent image.  When the last
1586  * reference is dropped, cleaning them up is safe.
1587  */
1588 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1589 {
1590         int counter;
1591
1592         if (!rbd_dev->parent_spec)
1593                 return;
1594
1595         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1596         if (counter > 0)
1597                 return;
1598
1599         /* Last reference; clean up parent data structures */
1600
1601         if (!counter)
1602                 rbd_dev_unparent(rbd_dev);
1603         else
1604                 rbd_warn(rbd_dev, "parent reference underflow");
1605 }
1606
1607 /*
1608  * If an image has a non-zero parent overlap, get a reference to its
1609  * parent.
1610  *
1611  * Returns true if the rbd device has a parent with a non-zero
1612  * overlap and a reference for it was successfully taken, or
1613  * false otherwise.
1614  */
1615 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1616 {
1617         int counter = 0;
1618
1619         if (!rbd_dev->parent_spec)
1620                 return false;
1621
1622         down_read(&rbd_dev->header_rwsem);
1623         if (rbd_dev->parent_overlap)
1624                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1625         up_read(&rbd_dev->header_rwsem);
1626
1627         if (counter < 0)
1628                 rbd_warn(rbd_dev, "parent reference overflow");
1629
1630         return counter > 0;
1631 }
1632
1633 /*
1634  * Caller is responsible for filling in the list of object requests
1635  * that comprises the image request, and the Linux request pointer
1636  * (if there is one).
1637  */
1638 static struct rbd_img_request *rbd_img_request_create(
1639                                         struct rbd_device *rbd_dev,
1640                                         enum obj_operation_type op_type,
1641                                         struct ceph_snap_context *snapc)
1642 {
1643         struct rbd_img_request *img_request;
1644
1645         img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1646         if (!img_request)
1647                 return NULL;
1648
1649         img_request->rbd_dev = rbd_dev;
1650         img_request->op_type = op_type;
1651         if (!rbd_img_is_write(img_request))
1652                 img_request->snap_id = rbd_dev->spec->snap_id;
1653         else
1654                 img_request->snapc = snapc;
1655
1656         if (rbd_dev_parent_get(rbd_dev))
1657                 img_request_layered_set(img_request);
1658
1659         INIT_LIST_HEAD(&img_request->lock_item);
1660         INIT_LIST_HEAD(&img_request->object_extents);
1661         mutex_init(&img_request->state_mutex);
1662         kref_init(&img_request->kref);
1663
1664         return img_request;
1665 }
1666
1667 static void rbd_img_request_destroy(struct kref *kref)
1668 {
1669         struct rbd_img_request *img_request;
1670         struct rbd_obj_request *obj_request;
1671         struct rbd_obj_request *next_obj_request;
1672
1673         img_request = container_of(kref, struct rbd_img_request, kref);
1674
1675         dout("%s: img %p\n", __func__, img_request);
1676
1677         WARN_ON(!list_empty(&img_request->lock_item));
1678         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1679                 rbd_img_obj_request_del(img_request, obj_request);
1680
1681         if (img_request_layered_test(img_request)) {
1682                 img_request_layered_clear(img_request);
1683                 rbd_dev_parent_put(img_request->rbd_dev);
1684         }
1685
1686         if (rbd_img_is_write(img_request))
1687                 ceph_put_snap_context(img_request->snapc);
1688
1689         kmem_cache_free(rbd_img_request_cache, img_request);
1690 }
1691
1692 #define BITS_PER_OBJ    2
1693 #define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1694 #define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1695
1696 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1697                                    u64 *index, u8 *shift)
1698 {
1699         u32 off;
1700
1701         rbd_assert(objno < rbd_dev->object_map_size);
1702         *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1703         *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1704 }
1705
1706 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1707 {
1708         u64 index;
1709         u8 shift;
1710
1711         lockdep_assert_held(&rbd_dev->object_map_lock);
1712         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1713         return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1714 }
1715
1716 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1717 {
1718         u64 index;
1719         u8 shift;
1720         u8 *p;
1721
1722         lockdep_assert_held(&rbd_dev->object_map_lock);
1723         rbd_assert(!(val & ~OBJ_MASK));
1724
1725         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1726         p = &rbd_dev->object_map[index];
1727         *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1728 }
1729
1730 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1731 {
1732         u8 state;
1733
1734         spin_lock(&rbd_dev->object_map_lock);
1735         state = __rbd_object_map_get(rbd_dev, objno);
1736         spin_unlock(&rbd_dev->object_map_lock);
1737         return state;
1738 }
1739
1740 static bool use_object_map(struct rbd_device *rbd_dev)
1741 {
1742         /*
1743          * An image mapped read-only can't use the object map -- it isn't
1744          * loaded because the header lock isn't acquired.  Someone else can
1745          * write to the image and update the object map behind our back.
1746          *
1747          * A snapshot can't be written to, so using the object map is always
1748          * safe.
1749          */
1750         if (!rbd_is_snap(rbd_dev) && rbd_is_ro(rbd_dev))
1751                 return false;
1752
1753         return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1754                 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1755 }
1756
1757 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1758 {
1759         u8 state;
1760
1761         /* fall back to default logic if object map is disabled or invalid */
1762         if (!use_object_map(rbd_dev))
1763                 return true;
1764
1765         state = rbd_object_map_get(rbd_dev, objno);
1766         return state != OBJECT_NONEXISTENT;
1767 }
1768
1769 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1770                                 struct ceph_object_id *oid)
1771 {
1772         if (snap_id == CEPH_NOSNAP)
1773                 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1774                                 rbd_dev->spec->image_id);
1775         else
1776                 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1777                                 rbd_dev->spec->image_id, snap_id);
1778 }
1779
1780 static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1781 {
1782         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1783         CEPH_DEFINE_OID_ONSTACK(oid);
1784         u8 lock_type;
1785         char *lock_tag;
1786         struct ceph_locker *lockers;
1787         u32 num_lockers;
1788         bool broke_lock = false;
1789         int ret;
1790
1791         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1792
1793 again:
1794         ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1795                             CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1796         if (ret != -EBUSY || broke_lock) {
1797                 if (ret == -EEXIST)
1798                         ret = 0; /* already locked by myself */
1799                 if (ret)
1800                         rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1801                 return ret;
1802         }
1803
1804         ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1805                                  RBD_LOCK_NAME, &lock_type, &lock_tag,
1806                                  &lockers, &num_lockers);
1807         if (ret) {
1808                 if (ret == -ENOENT)
1809                         goto again;
1810
1811                 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1812                 return ret;
1813         }
1814
1815         kfree(lock_tag);
1816         if (num_lockers == 0)
1817                 goto again;
1818
1819         rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1820                  ENTITY_NAME(lockers[0].id.name));
1821
1822         ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1823                                   RBD_LOCK_NAME, lockers[0].id.cookie,
1824                                   &lockers[0].id.name);
1825         ceph_free_lockers(lockers, num_lockers);
1826         if (ret) {
1827                 if (ret == -ENOENT)
1828                         goto again;
1829
1830                 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1831                 return ret;
1832         }
1833
1834         broke_lock = true;
1835         goto again;
1836 }
1837
1838 static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1839 {
1840         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1841         CEPH_DEFINE_OID_ONSTACK(oid);
1842         int ret;
1843
1844         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1845
1846         ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1847                               "");
1848         if (ret && ret != -ENOENT)
1849                 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1850 }
1851
1852 static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1853 {
1854         u8 struct_v;
1855         u32 struct_len;
1856         u32 header_len;
1857         void *header_end;
1858         int ret;
1859
1860         ceph_decode_32_safe(p, end, header_len, e_inval);
1861         header_end = *p + header_len;
1862
1863         ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1864                                   &struct_len);
1865         if (ret)
1866                 return ret;
1867
1868         ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1869
1870         *p = header_end;
1871         return 0;
1872
1873 e_inval:
1874         return -EINVAL;
1875 }
1876
1877 static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1878 {
1879         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1880         CEPH_DEFINE_OID_ONSTACK(oid);
1881         struct page **pages;
1882         void *p, *end;
1883         size_t reply_len;
1884         u64 num_objects;
1885         u64 object_map_bytes;
1886         u64 object_map_size;
1887         int num_pages;
1888         int ret;
1889
1890         rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1891
1892         num_objects = ceph_get_num_objects(&rbd_dev->layout,
1893                                            rbd_dev->mapping.size);
1894         object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1895                                             BITS_PER_BYTE);
1896         num_pages = calc_pages_for(0, object_map_bytes) + 1;
1897         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1898         if (IS_ERR(pages))
1899                 return PTR_ERR(pages);
1900
1901         reply_len = num_pages * PAGE_SIZE;
1902         rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1903         ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1904                              "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1905                              NULL, 0, pages, &reply_len);
1906         if (ret)
1907                 goto out;
1908
1909         p = page_address(pages[0]);
1910         end = p + min(reply_len, (size_t)PAGE_SIZE);
1911         ret = decode_object_map_header(&p, end, &object_map_size);
1912         if (ret)
1913                 goto out;
1914
1915         if (object_map_size != num_objects) {
1916                 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
1917                          object_map_size, num_objects);
1918                 ret = -EINVAL;
1919                 goto out;
1920         }
1921
1922         if (offset_in_page(p) + object_map_bytes > reply_len) {
1923                 ret = -EINVAL;
1924                 goto out;
1925         }
1926
1927         rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
1928         if (!rbd_dev->object_map) {
1929                 ret = -ENOMEM;
1930                 goto out;
1931         }
1932
1933         rbd_dev->object_map_size = object_map_size;
1934         ceph_copy_from_page_vector(pages, rbd_dev->object_map,
1935                                    offset_in_page(p), object_map_bytes);
1936
1937 out:
1938         ceph_release_page_vector(pages, num_pages);
1939         return ret;
1940 }
1941
1942 static void rbd_object_map_free(struct rbd_device *rbd_dev)
1943 {
1944         kvfree(rbd_dev->object_map);
1945         rbd_dev->object_map = NULL;
1946         rbd_dev->object_map_size = 0;
1947 }
1948
1949 static int rbd_object_map_load(struct rbd_device *rbd_dev)
1950 {
1951         int ret;
1952
1953         ret = __rbd_object_map_load(rbd_dev);
1954         if (ret)
1955                 return ret;
1956
1957         ret = rbd_dev_v2_get_flags(rbd_dev);
1958         if (ret) {
1959                 rbd_object_map_free(rbd_dev);
1960                 return ret;
1961         }
1962
1963         if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
1964                 rbd_warn(rbd_dev, "object map is invalid");
1965
1966         return 0;
1967 }
1968
1969 static int rbd_object_map_open(struct rbd_device *rbd_dev)
1970 {
1971         int ret;
1972
1973         ret = rbd_object_map_lock(rbd_dev);
1974         if (ret)
1975                 return ret;
1976
1977         ret = rbd_object_map_load(rbd_dev);
1978         if (ret) {
1979                 rbd_object_map_unlock(rbd_dev);
1980                 return ret;
1981         }
1982
1983         return 0;
1984 }
1985
1986 static void rbd_object_map_close(struct rbd_device *rbd_dev)
1987 {
1988         rbd_object_map_free(rbd_dev);
1989         rbd_object_map_unlock(rbd_dev);
1990 }
1991
1992 /*
1993  * This function needs snap_id (or more precisely just something to
1994  * distinguish between HEAD and snapshot object maps), new_state and
1995  * current_state that were passed to rbd_object_map_update().
1996  *
1997  * To avoid allocating and stashing a context we piggyback on the OSD
1998  * request.  A HEAD update has two ops (assert_locked).  For new_state
1999  * and current_state we decode our own object_map_update op, encoded in
2000  * rbd_cls_object_map_update().
2001  */
2002 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2003                                         struct ceph_osd_request *osd_req)
2004 {
2005         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2006         struct ceph_osd_data *osd_data;
2007         u64 objno;
2008         u8 state, new_state, uninitialized_var(current_state);
2009         bool has_current_state;
2010         void *p;
2011
2012         if (osd_req->r_result)
2013                 return osd_req->r_result;
2014
2015         /*
2016          * Nothing to do for a snapshot object map.
2017          */
2018         if (osd_req->r_num_ops == 1)
2019                 return 0;
2020
2021         /*
2022          * Update in-memory HEAD object map.
2023          */
2024         rbd_assert(osd_req->r_num_ops == 2);
2025         osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2026         rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2027
2028         p = page_address(osd_data->pages[0]);
2029         objno = ceph_decode_64(&p);
2030         rbd_assert(objno == obj_req->ex.oe_objno);
2031         rbd_assert(ceph_decode_64(&p) == objno + 1);
2032         new_state = ceph_decode_8(&p);
2033         has_current_state = ceph_decode_8(&p);
2034         if (has_current_state)
2035                 current_state = ceph_decode_8(&p);
2036
2037         spin_lock(&rbd_dev->object_map_lock);
2038         state = __rbd_object_map_get(rbd_dev, objno);
2039         if (!has_current_state || current_state == state ||
2040             (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2041                 __rbd_object_map_set(rbd_dev, objno, new_state);
2042         spin_unlock(&rbd_dev->object_map_lock);
2043
2044         return 0;
2045 }
2046
2047 static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2048 {
2049         struct rbd_obj_request *obj_req = osd_req->r_priv;
2050         int result;
2051
2052         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2053              osd_req->r_result, obj_req);
2054
2055         result = rbd_object_map_update_finish(obj_req, osd_req);
2056         rbd_obj_handle_request(obj_req, result);
2057 }
2058
2059 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2060 {
2061         u8 state = rbd_object_map_get(rbd_dev, objno);
2062
2063         if (state == new_state ||
2064             (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2065             (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2066                 return false;
2067
2068         return true;
2069 }
2070
2071 static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2072                                      int which, u64 objno, u8 new_state,
2073                                      const u8 *current_state)
2074 {
2075         struct page **pages;
2076         void *p, *start;
2077         int ret;
2078
2079         ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2080         if (ret)
2081                 return ret;
2082
2083         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2084         if (IS_ERR(pages))
2085                 return PTR_ERR(pages);
2086
2087         p = start = page_address(pages[0]);
2088         ceph_encode_64(&p, objno);
2089         ceph_encode_64(&p, objno + 1);
2090         ceph_encode_8(&p, new_state);
2091         if (current_state) {
2092                 ceph_encode_8(&p, 1);
2093                 ceph_encode_8(&p, *current_state);
2094         } else {
2095                 ceph_encode_8(&p, 0);
2096         }
2097
2098         osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2099                                           false, true);
2100         return 0;
2101 }
2102
2103 /*
2104  * Return:
2105  *   0 - object map update sent
2106  *   1 - object map update isn't needed
2107  *  <0 - error
2108  */
2109 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2110                                  u8 new_state, const u8 *current_state)
2111 {
2112         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2113         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2114         struct ceph_osd_request *req;
2115         int num_ops = 1;
2116         int which = 0;
2117         int ret;
2118
2119         if (snap_id == CEPH_NOSNAP) {
2120                 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2121                         return 1;
2122
2123                 num_ops++; /* assert_locked */
2124         }
2125
2126         req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2127         if (!req)
2128                 return -ENOMEM;
2129
2130         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2131         req->r_callback = rbd_object_map_callback;
2132         req->r_priv = obj_req;
2133
2134         rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2135         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2136         req->r_flags = CEPH_OSD_FLAG_WRITE;
2137         ktime_get_real_ts64(&req->r_mtime);
2138
2139         if (snap_id == CEPH_NOSNAP) {
2140                 /*
2141                  * Protect against possible race conditions during lock
2142                  * ownership transitions.
2143                  */
2144                 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2145                                              CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2146                 if (ret)
2147                         return ret;
2148         }
2149
2150         ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2151                                         new_state, current_state);
2152         if (ret)
2153                 return ret;
2154
2155         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2156         if (ret)
2157                 return ret;
2158
2159         ceph_osdc_start_request(osdc, req, false);
2160         return 0;
2161 }
2162
2163 static void prune_extents(struct ceph_file_extent *img_extents,
2164                           u32 *num_img_extents, u64 overlap)
2165 {
2166         u32 cnt = *num_img_extents;
2167
2168         /* drop extents completely beyond the overlap */
2169         while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2170                 cnt--;
2171
2172         if (cnt) {
2173                 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2174
2175                 /* trim final overlapping extent */
2176                 if (ex->fe_off + ex->fe_len > overlap)
2177                         ex->fe_len = overlap - ex->fe_off;
2178         }
2179
2180         *num_img_extents = cnt;
2181 }
2182
2183 /*
2184  * Determine the byte range(s) covered by either just the object extent
2185  * or the entire object in the parent image.
2186  */
2187 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2188                                     bool entire)
2189 {
2190         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2191         int ret;
2192
2193         if (!rbd_dev->parent_overlap)
2194                 return 0;
2195
2196         ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2197                                   entire ? 0 : obj_req->ex.oe_off,
2198                                   entire ? rbd_dev->layout.object_size :
2199                                                         obj_req->ex.oe_len,
2200                                   &obj_req->img_extents,
2201                                   &obj_req->num_img_extents);
2202         if (ret)
2203                 return ret;
2204
2205         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2206                       rbd_dev->parent_overlap);
2207         return 0;
2208 }
2209
2210 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2211 {
2212         struct rbd_obj_request *obj_req = osd_req->r_priv;
2213
2214         switch (obj_req->img_request->data_type) {
2215         case OBJ_REQUEST_BIO:
2216                 osd_req_op_extent_osd_data_bio(osd_req, which,
2217                                                &obj_req->bio_pos,
2218                                                obj_req->ex.oe_len);
2219                 break;
2220         case OBJ_REQUEST_BVECS:
2221         case OBJ_REQUEST_OWN_BVECS:
2222                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2223                                                         obj_req->ex.oe_len);
2224                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2225                 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2226                                                     &obj_req->bvec_pos);
2227                 break;
2228         default:
2229                 BUG();
2230         }
2231 }
2232
2233 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2234 {
2235         struct page **pages;
2236
2237         /*
2238          * The response data for a STAT call consists of:
2239          *     le64 length;
2240          *     struct {
2241          *         le32 tv_sec;
2242          *         le32 tv_nsec;
2243          *     } mtime;
2244          */
2245         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2246         if (IS_ERR(pages))
2247                 return PTR_ERR(pages);
2248
2249         osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2250         osd_req_op_raw_data_in_pages(osd_req, which, pages,
2251                                      8 + sizeof(struct ceph_timespec),
2252                                      0, false, true);
2253         return 0;
2254 }
2255
2256 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2257                                 u32 bytes)
2258 {
2259         struct rbd_obj_request *obj_req = osd_req->r_priv;
2260         int ret;
2261
2262         ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2263         if (ret)
2264                 return ret;
2265
2266         osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2267                                           obj_req->copyup_bvec_count, bytes);
2268         return 0;
2269 }
2270
2271 static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2272 {
2273         obj_req->read_state = RBD_OBJ_READ_START;
2274         return 0;
2275 }
2276
2277 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2278                                       int which)
2279 {
2280         struct rbd_obj_request *obj_req = osd_req->r_priv;
2281         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2282         u16 opcode;
2283
2284         if (!use_object_map(rbd_dev) ||
2285             !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2286                 osd_req_op_alloc_hint_init(osd_req, which++,
2287                                            rbd_dev->layout.object_size,
2288                                            rbd_dev->layout.object_size);
2289         }
2290
2291         if (rbd_obj_is_entire(obj_req))
2292                 opcode = CEPH_OSD_OP_WRITEFULL;
2293         else
2294                 opcode = CEPH_OSD_OP_WRITE;
2295
2296         osd_req_op_extent_init(osd_req, which, opcode,
2297                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2298         rbd_osd_setup_data(osd_req, which);
2299 }
2300
2301 static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2302 {
2303         int ret;
2304
2305         /* reverse map the entire object onto the parent */
2306         ret = rbd_obj_calc_img_extents(obj_req, true);
2307         if (ret)
2308                 return ret;
2309
2310         if (rbd_obj_copyup_enabled(obj_req))
2311                 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2312
2313         obj_req->write_state = RBD_OBJ_WRITE_START;
2314         return 0;
2315 }
2316
2317 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2318 {
2319         return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2320                                           CEPH_OSD_OP_ZERO;
2321 }
2322
2323 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2324                                         int which)
2325 {
2326         struct rbd_obj_request *obj_req = osd_req->r_priv;
2327
2328         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2329                 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2330                 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2331         } else {
2332                 osd_req_op_extent_init(osd_req, which,
2333                                        truncate_or_zero_opcode(obj_req),
2334                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2335                                        0, 0);
2336         }
2337 }
2338
2339 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2340 {
2341         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2342         u64 off, next_off;
2343         int ret;
2344
2345         /*
2346          * Align the range to alloc_size boundary and punt on discards
2347          * that are too small to free up any space.
2348          *
2349          * alloc_size == object_size && is_tail() is a special case for
2350          * filestore with filestore_punch_hole = false, needed to allow
2351          * truncate (in addition to delete).
2352          */
2353         if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2354             !rbd_obj_is_tail(obj_req)) {
2355                 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2356                 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2357                                       rbd_dev->opts->alloc_size);
2358                 if (off >= next_off)
2359                         return 1;
2360
2361                 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2362                      obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2363                      off, next_off - off);
2364                 obj_req->ex.oe_off = off;
2365                 obj_req->ex.oe_len = next_off - off;
2366         }
2367
2368         /* reverse map the entire object onto the parent */
2369         ret = rbd_obj_calc_img_extents(obj_req, true);
2370         if (ret)
2371                 return ret;
2372
2373         obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2374         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2375                 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2376
2377         obj_req->write_state = RBD_OBJ_WRITE_START;
2378         return 0;
2379 }
2380
2381 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2382                                         int which)
2383 {
2384         struct rbd_obj_request *obj_req = osd_req->r_priv;
2385         u16 opcode;
2386
2387         if (rbd_obj_is_entire(obj_req)) {
2388                 if (obj_req->num_img_extents) {
2389                         if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2390                                 osd_req_op_init(osd_req, which++,
2391                                                 CEPH_OSD_OP_CREATE, 0);
2392                         opcode = CEPH_OSD_OP_TRUNCATE;
2393                 } else {
2394                         rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2395                         osd_req_op_init(osd_req, which++,
2396                                         CEPH_OSD_OP_DELETE, 0);
2397                         opcode = 0;
2398                 }
2399         } else {
2400                 opcode = truncate_or_zero_opcode(obj_req);
2401         }
2402
2403         if (opcode)
2404                 osd_req_op_extent_init(osd_req, which, opcode,
2405                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2406                                        0, 0);
2407 }
2408
2409 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2410 {
2411         int ret;
2412
2413         /* reverse map the entire object onto the parent */
2414         ret = rbd_obj_calc_img_extents(obj_req, true);
2415         if (ret)
2416                 return ret;
2417
2418         if (rbd_obj_copyup_enabled(obj_req))
2419                 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2420         if (!obj_req->num_img_extents) {
2421                 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2422                 if (rbd_obj_is_entire(obj_req))
2423                         obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2424         }
2425
2426         obj_req->write_state = RBD_OBJ_WRITE_START;
2427         return 0;
2428 }
2429
2430 static int count_write_ops(struct rbd_obj_request *obj_req)
2431 {
2432         struct rbd_img_request *img_req = obj_req->img_request;
2433
2434         switch (img_req->op_type) {
2435         case OBJ_OP_WRITE:
2436                 if (!use_object_map(img_req->rbd_dev) ||
2437                     !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2438                         return 2; /* setallochint + write/writefull */
2439
2440                 return 1; /* write/writefull */
2441         case OBJ_OP_DISCARD:
2442                 return 1; /* delete/truncate/zero */
2443         case OBJ_OP_ZEROOUT:
2444                 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2445                     !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2446                         return 2; /* create + truncate */
2447
2448                 return 1; /* delete/truncate/zero */
2449         default:
2450                 BUG();
2451         }
2452 }
2453
2454 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2455                                     int which)
2456 {
2457         struct rbd_obj_request *obj_req = osd_req->r_priv;
2458
2459         switch (obj_req->img_request->op_type) {
2460         case OBJ_OP_WRITE:
2461                 __rbd_osd_setup_write_ops(osd_req, which);
2462                 break;
2463         case OBJ_OP_DISCARD:
2464                 __rbd_osd_setup_discard_ops(osd_req, which);
2465                 break;
2466         case OBJ_OP_ZEROOUT:
2467                 __rbd_osd_setup_zeroout_ops(osd_req, which);
2468                 break;
2469         default:
2470                 BUG();
2471         }
2472 }
2473
2474 /*
2475  * Prune the list of object requests (adjust offset and/or length, drop
2476  * redundant requests).  Prepare object request state machines and image
2477  * request state machine for execution.
2478  */
2479 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2480 {
2481         struct rbd_obj_request *obj_req, *next_obj_req;
2482         int ret;
2483
2484         for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2485                 switch (img_req->op_type) {
2486                 case OBJ_OP_READ:
2487                         ret = rbd_obj_init_read(obj_req);
2488                         break;
2489                 case OBJ_OP_WRITE:
2490                         ret = rbd_obj_init_write(obj_req);
2491                         break;
2492                 case OBJ_OP_DISCARD:
2493                         ret = rbd_obj_init_discard(obj_req);
2494                         break;
2495                 case OBJ_OP_ZEROOUT:
2496                         ret = rbd_obj_init_zeroout(obj_req);
2497                         break;
2498                 default:
2499                         BUG();
2500                 }
2501                 if (ret < 0)
2502                         return ret;
2503                 if (ret > 0) {
2504                         rbd_img_obj_request_del(img_req, obj_req);
2505                         continue;
2506                 }
2507         }
2508
2509         img_req->state = RBD_IMG_START;
2510         return 0;
2511 }
2512
2513 union rbd_img_fill_iter {
2514         struct ceph_bio_iter    bio_iter;
2515         struct ceph_bvec_iter   bvec_iter;
2516 };
2517
2518 struct rbd_img_fill_ctx {
2519         enum obj_request_type   pos_type;
2520         union rbd_img_fill_iter *pos;
2521         union rbd_img_fill_iter iter;
2522         ceph_object_extent_fn_t set_pos_fn;
2523         ceph_object_extent_fn_t count_fn;
2524         ceph_object_extent_fn_t copy_fn;
2525 };
2526
2527 static struct ceph_object_extent *alloc_object_extent(void *arg)
2528 {
2529         struct rbd_img_request *img_req = arg;
2530         struct rbd_obj_request *obj_req;
2531
2532         obj_req = rbd_obj_request_create();
2533         if (!obj_req)
2534                 return NULL;
2535
2536         rbd_img_obj_request_add(img_req, obj_req);
2537         return &obj_req->ex;
2538 }
2539
2540 /*
2541  * While su != os && sc == 1 is technically not fancy (it's the same
2542  * layout as su == os && sc == 1), we can't use the nocopy path for it
2543  * because ->set_pos_fn() should be called only once per object.
2544  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2545  * treat su != os && sc == 1 as fancy.
2546  */
2547 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2548 {
2549         return l->stripe_unit != l->object_size;
2550 }
2551
2552 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2553                                        struct ceph_file_extent *img_extents,
2554                                        u32 num_img_extents,
2555                                        struct rbd_img_fill_ctx *fctx)
2556 {
2557         u32 i;
2558         int ret;
2559
2560         img_req->data_type = fctx->pos_type;
2561
2562         /*
2563          * Create object requests and set each object request's starting
2564          * position in the provided bio (list) or bio_vec array.
2565          */
2566         fctx->iter = *fctx->pos;
2567         for (i = 0; i < num_img_extents; i++) {
2568                 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2569                                            img_extents[i].fe_off,
2570                                            img_extents[i].fe_len,
2571                                            &img_req->object_extents,
2572                                            alloc_object_extent, img_req,
2573                                            fctx->set_pos_fn, &fctx->iter);
2574                 if (ret)
2575                         return ret;
2576         }
2577
2578         return __rbd_img_fill_request(img_req);
2579 }
2580
2581 /*
2582  * Map a list of image extents to a list of object extents, create the
2583  * corresponding object requests (normally each to a different object,
2584  * but not always) and add them to @img_req.  For each object request,
2585  * set up its data descriptor to point to the corresponding chunk(s) of
2586  * @fctx->pos data buffer.
2587  *
2588  * Because ceph_file_to_extents() will merge adjacent object extents
2589  * together, each object request's data descriptor may point to multiple
2590  * different chunks of @fctx->pos data buffer.
2591  *
2592  * @fctx->pos data buffer is assumed to be large enough.
2593  */
2594 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2595                                 struct ceph_file_extent *img_extents,
2596                                 u32 num_img_extents,
2597                                 struct rbd_img_fill_ctx *fctx)
2598 {
2599         struct rbd_device *rbd_dev = img_req->rbd_dev;
2600         struct rbd_obj_request *obj_req;
2601         u32 i;
2602         int ret;
2603
2604         if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2605             !rbd_layout_is_fancy(&rbd_dev->layout))
2606                 return rbd_img_fill_request_nocopy(img_req, img_extents,
2607                                                    num_img_extents, fctx);
2608
2609         img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2610
2611         /*
2612          * Create object requests and determine ->bvec_count for each object
2613          * request.  Note that ->bvec_count sum over all object requests may
2614          * be greater than the number of bio_vecs in the provided bio (list)
2615          * or bio_vec array because when mapped, those bio_vecs can straddle
2616          * stripe unit boundaries.
2617          */
2618         fctx->iter = *fctx->pos;
2619         for (i = 0; i < num_img_extents; i++) {
2620                 ret = ceph_file_to_extents(&rbd_dev->layout,
2621                                            img_extents[i].fe_off,
2622                                            img_extents[i].fe_len,
2623                                            &img_req->object_extents,
2624                                            alloc_object_extent, img_req,
2625                                            fctx->count_fn, &fctx->iter);
2626                 if (ret)
2627                         return ret;
2628         }
2629
2630         for_each_obj_request(img_req, obj_req) {
2631                 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2632                                               sizeof(*obj_req->bvec_pos.bvecs),
2633                                               GFP_NOIO);
2634                 if (!obj_req->bvec_pos.bvecs)
2635                         return -ENOMEM;
2636         }
2637
2638         /*
2639          * Fill in each object request's private bio_vec array, splitting and
2640          * rearranging the provided bio_vecs in stripe unit chunks as needed.
2641          */
2642         fctx->iter = *fctx->pos;
2643         for (i = 0; i < num_img_extents; i++) {
2644                 ret = ceph_iterate_extents(&rbd_dev->layout,
2645                                            img_extents[i].fe_off,
2646                                            img_extents[i].fe_len,
2647                                            &img_req->object_extents,
2648                                            fctx->copy_fn, &fctx->iter);
2649                 if (ret)
2650                         return ret;
2651         }
2652
2653         return __rbd_img_fill_request(img_req);
2654 }
2655
2656 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2657                                u64 off, u64 len)
2658 {
2659         struct ceph_file_extent ex = { off, len };
2660         union rbd_img_fill_iter dummy = {};
2661         struct rbd_img_fill_ctx fctx = {
2662                 .pos_type = OBJ_REQUEST_NODATA,
2663                 .pos = &dummy,
2664         };
2665
2666         return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2667 }
2668
2669 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2670 {
2671         struct rbd_obj_request *obj_req =
2672             container_of(ex, struct rbd_obj_request, ex);
2673         struct ceph_bio_iter *it = arg;
2674
2675         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2676         obj_req->bio_pos = *it;
2677         ceph_bio_iter_advance(it, bytes);
2678 }
2679
2680 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2681 {
2682         struct rbd_obj_request *obj_req =
2683             container_of(ex, struct rbd_obj_request, ex);
2684         struct ceph_bio_iter *it = arg;
2685
2686         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2687         ceph_bio_iter_advance_step(it, bytes, ({
2688                 obj_req->bvec_count++;
2689         }));
2690
2691 }
2692
2693 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2694 {
2695         struct rbd_obj_request *obj_req =
2696             container_of(ex, struct rbd_obj_request, ex);
2697         struct ceph_bio_iter *it = arg;
2698
2699         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2700         ceph_bio_iter_advance_step(it, bytes, ({
2701                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2702                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2703         }));
2704 }
2705
2706 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2707                                    struct ceph_file_extent *img_extents,
2708                                    u32 num_img_extents,
2709                                    struct ceph_bio_iter *bio_pos)
2710 {
2711         struct rbd_img_fill_ctx fctx = {
2712                 .pos_type = OBJ_REQUEST_BIO,
2713                 .pos = (union rbd_img_fill_iter *)bio_pos,
2714                 .set_pos_fn = set_bio_pos,
2715                 .count_fn = count_bio_bvecs,
2716                 .copy_fn = copy_bio_bvecs,
2717         };
2718
2719         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2720                                     &fctx);
2721 }
2722
2723 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2724                                  u64 off, u64 len, struct bio *bio)
2725 {
2726         struct ceph_file_extent ex = { off, len };
2727         struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2728
2729         return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2730 }
2731
2732 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2733 {
2734         struct rbd_obj_request *obj_req =
2735             container_of(ex, struct rbd_obj_request, ex);
2736         struct ceph_bvec_iter *it = arg;
2737
2738         obj_req->bvec_pos = *it;
2739         ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2740         ceph_bvec_iter_advance(it, bytes);
2741 }
2742
2743 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2744 {
2745         struct rbd_obj_request *obj_req =
2746             container_of(ex, struct rbd_obj_request, ex);
2747         struct ceph_bvec_iter *it = arg;
2748
2749         ceph_bvec_iter_advance_step(it, bytes, ({
2750                 obj_req->bvec_count++;
2751         }));
2752 }
2753
2754 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2755 {
2756         struct rbd_obj_request *obj_req =
2757             container_of(ex, struct rbd_obj_request, ex);
2758         struct ceph_bvec_iter *it = arg;
2759
2760         ceph_bvec_iter_advance_step(it, bytes, ({
2761                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2762                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2763         }));
2764 }
2765
2766 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2767                                      struct ceph_file_extent *img_extents,
2768                                      u32 num_img_extents,
2769                                      struct ceph_bvec_iter *bvec_pos)
2770 {
2771         struct rbd_img_fill_ctx fctx = {
2772                 .pos_type = OBJ_REQUEST_BVECS,
2773                 .pos = (union rbd_img_fill_iter *)bvec_pos,
2774                 .set_pos_fn = set_bvec_pos,
2775                 .count_fn = count_bvecs,
2776                 .copy_fn = copy_bvecs,
2777         };
2778
2779         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2780                                     &fctx);
2781 }
2782
2783 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2784                                    struct ceph_file_extent *img_extents,
2785                                    u32 num_img_extents,
2786                                    struct bio_vec *bvecs)
2787 {
2788         struct ceph_bvec_iter it = {
2789                 .bvecs = bvecs,
2790                 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2791                                                              num_img_extents) },
2792         };
2793
2794         return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2795                                          &it);
2796 }
2797
2798 static void rbd_img_handle_request_work(struct work_struct *work)
2799 {
2800         struct rbd_img_request *img_req =
2801             container_of(work, struct rbd_img_request, work);
2802
2803         rbd_img_handle_request(img_req, img_req->work_result);
2804 }
2805
2806 static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2807 {
2808         INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2809         img_req->work_result = result;
2810         queue_work(rbd_wq, &img_req->work);
2811 }
2812
2813 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2814 {
2815         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2816
2817         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2818                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2819                 return true;
2820         }
2821
2822         dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2823              obj_req->ex.oe_objno);
2824         return false;
2825 }
2826
2827 static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2828 {
2829         struct ceph_osd_request *osd_req;
2830         int ret;
2831
2832         osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2833         if (IS_ERR(osd_req))
2834                 return PTR_ERR(osd_req);
2835
2836         osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2837                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2838         rbd_osd_setup_data(osd_req, 0);
2839         rbd_osd_format_read(osd_req);
2840
2841         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2842         if (ret)
2843                 return ret;
2844
2845         rbd_osd_submit(osd_req);
2846         return 0;
2847 }
2848
2849 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2850 {
2851         struct rbd_img_request *img_req = obj_req->img_request;
2852         struct rbd_img_request *child_img_req;
2853         int ret;
2854
2855         child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2856                                                OBJ_OP_READ, NULL);
2857         if (!child_img_req)
2858                 return -ENOMEM;
2859
2860         __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2861         child_img_req->obj_request = obj_req;
2862
2863         dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2864              obj_req);
2865
2866         if (!rbd_img_is_write(img_req)) {
2867                 switch (img_req->data_type) {
2868                 case OBJ_REQUEST_BIO:
2869                         ret = __rbd_img_fill_from_bio(child_img_req,
2870                                                       obj_req->img_extents,
2871                                                       obj_req->num_img_extents,
2872                                                       &obj_req->bio_pos);
2873                         break;
2874                 case OBJ_REQUEST_BVECS:
2875                 case OBJ_REQUEST_OWN_BVECS:
2876                         ret = __rbd_img_fill_from_bvecs(child_img_req,
2877                                                       obj_req->img_extents,
2878                                                       obj_req->num_img_extents,
2879                                                       &obj_req->bvec_pos);
2880                         break;
2881                 default:
2882                         BUG();
2883                 }
2884         } else {
2885                 ret = rbd_img_fill_from_bvecs(child_img_req,
2886                                               obj_req->img_extents,
2887                                               obj_req->num_img_extents,
2888                                               obj_req->copyup_bvecs);
2889         }
2890         if (ret) {
2891                 rbd_img_request_put(child_img_req);
2892                 return ret;
2893         }
2894
2895         /* avoid parent chain recursion */
2896         rbd_img_schedule(child_img_req, 0);
2897         return 0;
2898 }
2899
2900 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2901 {
2902         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2903         int ret;
2904
2905 again:
2906         switch (obj_req->read_state) {
2907         case RBD_OBJ_READ_START:
2908                 rbd_assert(!*result);
2909
2910                 if (!rbd_obj_may_exist(obj_req)) {
2911                         *result = -ENOENT;
2912                         obj_req->read_state = RBD_OBJ_READ_OBJECT;
2913                         goto again;
2914                 }
2915
2916                 ret = rbd_obj_read_object(obj_req);
2917                 if (ret) {
2918                         *result = ret;
2919                         return true;
2920                 }
2921                 obj_req->read_state = RBD_OBJ_READ_OBJECT;
2922                 return false;
2923         case RBD_OBJ_READ_OBJECT:
2924                 if (*result == -ENOENT && rbd_dev->parent_overlap) {
2925                         /* reverse map this object extent onto the parent */
2926                         ret = rbd_obj_calc_img_extents(obj_req, false);
2927                         if (ret) {
2928                                 *result = ret;
2929                                 return true;
2930                         }
2931                         if (obj_req->num_img_extents) {
2932                                 ret = rbd_obj_read_from_parent(obj_req);
2933                                 if (ret) {
2934                                         *result = ret;
2935                                         return true;
2936                                 }
2937                                 obj_req->read_state = RBD_OBJ_READ_PARENT;
2938                                 return false;
2939                         }
2940                 }
2941
2942                 /*
2943                  * -ENOENT means a hole in the image -- zero-fill the entire
2944                  * length of the request.  A short read also implies zero-fill
2945                  * to the end of the request.
2946                  */
2947                 if (*result == -ENOENT) {
2948                         rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
2949                         *result = 0;
2950                 } else if (*result >= 0) {
2951                         if (*result < obj_req->ex.oe_len)
2952                                 rbd_obj_zero_range(obj_req, *result,
2953                                                 obj_req->ex.oe_len - *result);
2954                         else
2955                                 rbd_assert(*result == obj_req->ex.oe_len);
2956                         *result = 0;
2957                 }
2958                 return true;
2959         case RBD_OBJ_READ_PARENT:
2960                 /*
2961                  * The parent image is read only up to the overlap -- zero-fill
2962                  * from the overlap to the end of the request.
2963                  */
2964                 if (!*result) {
2965                         u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
2966
2967                         if (obj_overlap < obj_req->ex.oe_len)
2968                                 rbd_obj_zero_range(obj_req, obj_overlap,
2969                                             obj_req->ex.oe_len - obj_overlap);
2970                 }
2971                 return true;
2972         default:
2973                 BUG();
2974         }
2975 }
2976
2977 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
2978 {
2979         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2980
2981         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
2982                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2983
2984         if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
2985             (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
2986                 dout("%s %p noop for nonexistent\n", __func__, obj_req);
2987                 return true;
2988         }
2989
2990         return false;
2991 }
2992
2993 /*
2994  * Return:
2995  *   0 - object map update sent
2996  *   1 - object map update isn't needed
2997  *  <0 - error
2998  */
2999 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3000 {
3001         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3002         u8 new_state;
3003
3004         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3005                 return 1;
3006
3007         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3008                 new_state = OBJECT_PENDING;
3009         else
3010                 new_state = OBJECT_EXISTS;
3011
3012         return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3013 }
3014
3015 static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3016 {
3017         struct ceph_osd_request *osd_req;
3018         int num_ops = count_write_ops(obj_req);
3019         int which = 0;
3020         int ret;
3021
3022         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3023                 num_ops++; /* stat */
3024
3025         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3026         if (IS_ERR(osd_req))
3027                 return PTR_ERR(osd_req);
3028
3029         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3030                 ret = rbd_osd_setup_stat(osd_req, which++);
3031                 if (ret)
3032                         return ret;
3033         }
3034
3035         rbd_osd_setup_write_ops(osd_req, which);
3036         rbd_osd_format_write(osd_req);
3037
3038         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3039         if (ret)
3040                 return ret;
3041
3042         rbd_osd_submit(osd_req);
3043         return 0;
3044 }
3045
3046 /*
3047  * copyup_bvecs pages are never highmem pages
3048  */
3049 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3050 {
3051         struct ceph_bvec_iter it = {
3052                 .bvecs = bvecs,
3053                 .iter = { .bi_size = bytes },
3054         };
3055
3056         ceph_bvec_iter_advance_step(&it, bytes, ({
3057                 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3058                                bv.bv_len))
3059                         return false;
3060         }));
3061         return true;
3062 }
3063
3064 #define MODS_ONLY       U32_MAX
3065
3066 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3067                                       u32 bytes)
3068 {
3069         struct ceph_osd_request *osd_req;
3070         int ret;
3071
3072         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3073         rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3074
3075         osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3076         if (IS_ERR(osd_req))
3077                 return PTR_ERR(osd_req);
3078
3079         ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3080         if (ret)
3081                 return ret;
3082
3083         rbd_osd_format_write(osd_req);
3084
3085         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3086         if (ret)
3087                 return ret;
3088
3089         rbd_osd_submit(osd_req);
3090         return 0;
3091 }
3092
3093 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3094                                         u32 bytes)
3095 {
3096         struct ceph_osd_request *osd_req;
3097         int num_ops = count_write_ops(obj_req);
3098         int which = 0;
3099         int ret;
3100
3101         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3102
3103         if (bytes != MODS_ONLY)
3104                 num_ops++; /* copyup */
3105
3106         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3107         if (IS_ERR(osd_req))
3108                 return PTR_ERR(osd_req);
3109
3110         if (bytes != MODS_ONLY) {
3111                 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3112                 if (ret)
3113                         return ret;
3114         }
3115
3116         rbd_osd_setup_write_ops(osd_req, which);
3117         rbd_osd_format_write(osd_req);
3118
3119         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3120         if (ret)
3121                 return ret;
3122
3123         rbd_osd_submit(osd_req);
3124         return 0;
3125 }
3126
3127 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3128 {
3129         u32 i;
3130
3131         rbd_assert(!obj_req->copyup_bvecs);
3132         obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3133         obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3134                                         sizeof(*obj_req->copyup_bvecs),
3135                                         GFP_NOIO);
3136         if (!obj_req->copyup_bvecs)
3137                 return -ENOMEM;
3138
3139         for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3140                 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3141
3142                 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3143                 if (!obj_req->copyup_bvecs[i].bv_page)
3144                         return -ENOMEM;
3145
3146                 obj_req->copyup_bvecs[i].bv_offset = 0;
3147                 obj_req->copyup_bvecs[i].bv_len = len;
3148                 obj_overlap -= len;
3149         }
3150
3151         rbd_assert(!obj_overlap);
3152         return 0;
3153 }
3154
3155 /*
3156  * The target object doesn't exist.  Read the data for the entire
3157  * target object up to the overlap point (if any) from the parent,
3158  * so we can use it for a copyup.
3159  */
3160 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3161 {
3162         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3163         int ret;
3164
3165         rbd_assert(obj_req->num_img_extents);
3166         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3167                       rbd_dev->parent_overlap);
3168         if (!obj_req->num_img_extents) {
3169                 /*
3170                  * The overlap has become 0 (most likely because the
3171                  * image has been flattened).  Re-submit the original write
3172                  * request -- pass MODS_ONLY since the copyup isn't needed
3173                  * anymore.
3174                  */
3175                 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3176         }
3177
3178         ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3179         if (ret)
3180                 return ret;
3181
3182         return rbd_obj_read_from_parent(obj_req);
3183 }
3184
3185 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3186 {
3187         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3188         struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3189         u8 new_state;
3190         u32 i;
3191         int ret;
3192
3193         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3194
3195         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3196                 return;
3197
3198         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3199                 return;
3200
3201         for (i = 0; i < snapc->num_snaps; i++) {
3202                 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3203                     i + 1 < snapc->num_snaps)
3204                         new_state = OBJECT_EXISTS_CLEAN;
3205                 else
3206                         new_state = OBJECT_EXISTS;
3207
3208                 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3209                                             new_state, NULL);
3210                 if (ret < 0) {
3211                         obj_req->pending.result = ret;
3212                         return;
3213                 }
3214
3215                 rbd_assert(!ret);
3216                 obj_req->pending.num_pending++;
3217         }
3218 }
3219
3220 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3221 {
3222         u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3223         int ret;
3224
3225         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3226
3227         /*
3228          * Only send non-zero copyup data to save some I/O and network
3229          * bandwidth -- zero copyup data is equivalent to the object not
3230          * existing.
3231          */
3232         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3233                 bytes = 0;
3234
3235         if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3236                 /*
3237                  * Send a copyup request with an empty snapshot context to
3238                  * deep-copyup the object through all existing snapshots.
3239                  * A second request with the current snapshot context will be
3240                  * sent for the actual modification.
3241                  */
3242                 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3243                 if (ret) {
3244                         obj_req->pending.result = ret;
3245                         return;
3246                 }
3247
3248                 obj_req->pending.num_pending++;
3249                 bytes = MODS_ONLY;
3250         }
3251
3252         ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3253         if (ret) {
3254                 obj_req->pending.result = ret;
3255                 return;
3256         }
3257
3258         obj_req->pending.num_pending++;
3259 }
3260
3261 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3262 {
3263         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3264         int ret;
3265
3266 again:
3267         switch (obj_req->copyup_state) {
3268         case RBD_OBJ_COPYUP_START:
3269                 rbd_assert(!*result);
3270
3271                 ret = rbd_obj_copyup_read_parent(obj_req);
3272                 if (ret) {
3273                         *result = ret;
3274                         return true;
3275                 }
3276                 if (obj_req->num_img_extents)
3277                         obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3278                 else
3279                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3280                 return false;
3281         case RBD_OBJ_COPYUP_READ_PARENT:
3282                 if (*result)
3283                         return true;
3284
3285                 if (is_zero_bvecs(obj_req->copyup_bvecs,
3286                                   rbd_obj_img_extents_bytes(obj_req))) {
3287                         dout("%s %p detected zeros\n", __func__, obj_req);
3288                         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3289                 }
3290
3291                 rbd_obj_copyup_object_maps(obj_req);
3292                 if (!obj_req->pending.num_pending) {
3293                         *result = obj_req->pending.result;
3294                         obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3295                         goto again;
3296                 }
3297                 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3298                 return false;
3299         case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3300                 if (!pending_result_dec(&obj_req->pending, result))
3301                         return false;
3302                 /* fall through */
3303         case RBD_OBJ_COPYUP_OBJECT_MAPS:
3304                 if (*result) {
3305                         rbd_warn(rbd_dev, "snap object map update failed: %d",
3306                                  *result);
3307                         return true;
3308                 }
3309
3310                 rbd_obj_copyup_write_object(obj_req);
3311                 if (!obj_req->pending.num_pending) {
3312                         *result = obj_req->pending.result;
3313                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3314                         goto again;
3315                 }
3316                 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3317                 return false;
3318         case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3319                 if (!pending_result_dec(&obj_req->pending, result))
3320                         return false;
3321                 /* fall through */
3322         case RBD_OBJ_COPYUP_WRITE_OBJECT:
3323                 return true;
3324         default:
3325                 BUG();
3326         }
3327 }
3328
3329 /*
3330  * Return:
3331  *   0 - object map update sent
3332  *   1 - object map update isn't needed
3333  *  <0 - error
3334  */
3335 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3336 {
3337         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3338         u8 current_state = OBJECT_PENDING;
3339
3340         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3341                 return 1;
3342
3343         if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3344                 return 1;
3345
3346         return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3347                                      &current_state);
3348 }
3349
3350 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3351 {
3352         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3353         int ret;
3354
3355 again:
3356         switch (obj_req->write_state) {
3357         case RBD_OBJ_WRITE_START:
3358                 rbd_assert(!*result);
3359
3360                 if (rbd_obj_write_is_noop(obj_req))
3361                         return true;
3362
3363                 ret = rbd_obj_write_pre_object_map(obj_req);
3364                 if (ret < 0) {
3365                         *result = ret;
3366                         return true;
3367                 }
3368                 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3369                 if (ret > 0)
3370                         goto again;
3371                 return false;
3372         case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3373                 if (*result) {
3374                         rbd_warn(rbd_dev, "pre object map update failed: %d",
3375                                  *result);
3376                         return true;
3377                 }
3378                 ret = rbd_obj_write_object(obj_req);
3379                 if (ret) {
3380                         *result = ret;
3381                         return true;
3382                 }
3383                 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3384                 return false;
3385         case RBD_OBJ_WRITE_OBJECT:
3386                 if (*result == -ENOENT) {
3387                         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3388                                 *result = 0;
3389                                 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3390                                 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3391                                 goto again;
3392                         }
3393                         /*
3394                          * On a non-existent object:
3395                          *   delete - -ENOENT, truncate/zero - 0
3396                          */
3397                         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3398                                 *result = 0;
3399                 }
3400                 if (*result)
3401                         return true;
3402
3403                 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3404                 goto again;
3405         case __RBD_OBJ_WRITE_COPYUP:
3406                 if (!rbd_obj_advance_copyup(obj_req, result))
3407                         return false;
3408                 /* fall through */
3409         case RBD_OBJ_WRITE_COPYUP:
3410                 if (*result) {
3411                         rbd_warn(rbd_dev, "copyup failed: %d", *result);
3412                         return true;
3413                 }
3414                 ret = rbd_obj_write_post_object_map(obj_req);
3415                 if (ret < 0) {
3416                         *result = ret;
3417                         return true;
3418                 }
3419                 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3420                 if (ret > 0)
3421                         goto again;
3422                 return false;
3423         case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3424                 if (*result)
3425                         rbd_warn(rbd_dev, "post object map update failed: %d",
3426                                  *result);
3427                 return true;
3428         default:
3429                 BUG();
3430         }
3431 }
3432
3433 /*
3434  * Return true if @obj_req is completed.
3435  */
3436 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3437                                      int *result)
3438 {
3439         struct rbd_img_request *img_req = obj_req->img_request;
3440         struct rbd_device *rbd_dev = img_req->rbd_dev;
3441         bool done;
3442
3443         mutex_lock(&obj_req->state_mutex);
3444         if (!rbd_img_is_write(img_req))
3445                 done = rbd_obj_advance_read(obj_req, result);
3446         else
3447                 done = rbd_obj_advance_write(obj_req, result);
3448         mutex_unlock(&obj_req->state_mutex);
3449
3450         if (done && *result) {
3451                 rbd_assert(*result < 0);
3452                 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3453                          obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3454                          obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3455         }
3456         return done;
3457 }
3458
3459 /*
3460  * This is open-coded in rbd_img_handle_request() to avoid parent chain
3461  * recursion.
3462  */
3463 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3464 {
3465         if (__rbd_obj_handle_request(obj_req, &result))
3466                 rbd_img_handle_request(obj_req->img_request, result);
3467 }
3468
3469 static bool need_exclusive_lock(struct rbd_img_request *img_req)
3470 {
3471         struct rbd_device *rbd_dev = img_req->rbd_dev;
3472
3473         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3474                 return false;
3475
3476         if (rbd_is_ro(rbd_dev))
3477                 return false;
3478
3479         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3480         if (rbd_dev->opts->lock_on_read ||
3481             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3482                 return true;
3483
3484         return rbd_img_is_write(img_req);
3485 }
3486
3487 static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3488 {
3489         struct rbd_device *rbd_dev = img_req->rbd_dev;
3490         bool locked;
3491
3492         lockdep_assert_held(&rbd_dev->lock_rwsem);
3493         locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3494         spin_lock(&rbd_dev->lock_lists_lock);
3495         rbd_assert(list_empty(&img_req->lock_item));
3496         if (!locked)
3497                 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3498         else
3499                 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3500         spin_unlock(&rbd_dev->lock_lists_lock);
3501         return locked;
3502 }
3503
3504 static void rbd_lock_del_request(struct rbd_img_request *img_req)
3505 {
3506         struct rbd_device *rbd_dev = img_req->rbd_dev;
3507         bool need_wakeup;
3508
3509         lockdep_assert_held(&rbd_dev->lock_rwsem);
3510         spin_lock(&rbd_dev->lock_lists_lock);
3511         rbd_assert(!list_empty(&img_req->lock_item));
3512         list_del_init(&img_req->lock_item);
3513         need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3514                        list_empty(&rbd_dev->running_list));
3515         spin_unlock(&rbd_dev->lock_lists_lock);
3516         if (need_wakeup)
3517                 complete(&rbd_dev->releasing_wait);
3518 }
3519
3520 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3521 {
3522         struct rbd_device *rbd_dev = img_req->rbd_dev;
3523
3524         if (!need_exclusive_lock(img_req))
3525                 return 1;
3526
3527         if (rbd_lock_add_request(img_req))
3528                 return 1;
3529
3530         if (rbd_dev->opts->exclusive) {
3531                 WARN_ON(1); /* lock got released? */
3532                 return -EROFS;
3533         }
3534
3535         /*
3536          * Note the use of mod_delayed_work() in rbd_acquire_lock()
3537          * and cancel_delayed_work() in wake_lock_waiters().
3538          */
3539         dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3540         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3541         return 0;
3542 }
3543
3544 static void rbd_img_object_requests(struct rbd_img_request *img_req)
3545 {
3546         struct rbd_obj_request *obj_req;
3547
3548         rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3549
3550         for_each_obj_request(img_req, obj_req) {
3551                 int result = 0;
3552
3553                 if (__rbd_obj_handle_request(obj_req, &result)) {
3554                         if (result) {
3555                                 img_req->pending.result = result;
3556                                 return;
3557                         }
3558                 } else {
3559                         img_req->pending.num_pending++;
3560                 }
3561         }
3562 }
3563
3564 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3565 {
3566         struct rbd_device *rbd_dev = img_req->rbd_dev;
3567         int ret;
3568
3569 again:
3570         switch (img_req->state) {
3571         case RBD_IMG_START:
3572                 rbd_assert(!*result);
3573
3574                 ret = rbd_img_exclusive_lock(img_req);
3575                 if (ret < 0) {
3576                         *result = ret;
3577                         return true;
3578                 }
3579                 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3580                 if (ret > 0)
3581                         goto again;
3582                 return false;
3583         case RBD_IMG_EXCLUSIVE_LOCK:
3584                 if (*result)
3585                         return true;
3586
3587                 rbd_assert(!need_exclusive_lock(img_req) ||
3588                            __rbd_is_lock_owner(rbd_dev));
3589
3590                 rbd_img_object_requests(img_req);
3591                 if (!img_req->pending.num_pending) {
3592                         *result = img_req->pending.result;
3593                         img_req->state = RBD_IMG_OBJECT_REQUESTS;
3594                         goto again;
3595                 }
3596                 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3597                 return false;
3598         case __RBD_IMG_OBJECT_REQUESTS:
3599                 if (!pending_result_dec(&img_req->pending, result))
3600                         return false;
3601                 /* fall through */
3602         case RBD_IMG_OBJECT_REQUESTS:
3603                 return true;
3604         default:
3605                 BUG();
3606         }
3607 }
3608
3609 /*
3610  * Return true if @img_req is completed.
3611  */
3612 static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3613                                      int *result)
3614 {
3615         struct rbd_device *rbd_dev = img_req->rbd_dev;
3616         bool done;
3617
3618         if (need_exclusive_lock(img_req)) {
3619                 down_read(&rbd_dev->lock_rwsem);
3620                 mutex_lock(&img_req->state_mutex);
3621                 done = rbd_img_advance(img_req, result);
3622                 if (done)
3623                         rbd_lock_del_request(img_req);
3624                 mutex_unlock(&img_req->state_mutex);
3625                 up_read(&rbd_dev->lock_rwsem);
3626         } else {
3627                 mutex_lock(&img_req->state_mutex);
3628                 done = rbd_img_advance(img_req, result);
3629                 mutex_unlock(&img_req->state_mutex);
3630         }
3631
3632         if (done && *result) {
3633                 rbd_assert(*result < 0);
3634                 rbd_warn(rbd_dev, "%s%s result %d",
3635                       test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3636                       obj_op_name(img_req->op_type), *result);
3637         }
3638         return done;
3639 }
3640
3641 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3642 {
3643 again:
3644         if (!__rbd_img_handle_request(img_req, &result))
3645                 return;
3646
3647         if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3648                 struct rbd_obj_request *obj_req = img_req->obj_request;
3649
3650                 rbd_img_request_put(img_req);
3651                 if (__rbd_obj_handle_request(obj_req, &result)) {
3652                         img_req = obj_req->img_request;
3653                         goto again;
3654                 }
3655         } else {
3656                 struct request *rq = img_req->rq;
3657
3658                 rbd_img_request_put(img_req);
3659                 blk_mq_end_request(rq, errno_to_blk_status(result));
3660         }
3661 }
3662
3663 static const struct rbd_client_id rbd_empty_cid;
3664
3665 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3666                           const struct rbd_client_id *rhs)
3667 {
3668         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3669 }
3670
3671 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3672 {
3673         struct rbd_client_id cid;
3674
3675         mutex_lock(&rbd_dev->watch_mutex);
3676         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3677         cid.handle = rbd_dev->watch_cookie;
3678         mutex_unlock(&rbd_dev->watch_mutex);
3679         return cid;
3680 }
3681
3682 /*
3683  * lock_rwsem must be held for write
3684  */
3685 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3686                               const struct rbd_client_id *cid)
3687 {
3688         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3689              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3690              cid->gid, cid->handle);
3691         rbd_dev->owner_cid = *cid; /* struct */
3692 }
3693
3694 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3695 {
3696         mutex_lock(&rbd_dev->watch_mutex);
3697         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3698         mutex_unlock(&rbd_dev->watch_mutex);
3699 }
3700
3701 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3702 {
3703         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3704
3705         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3706         strcpy(rbd_dev->lock_cookie, cookie);
3707         rbd_set_owner_cid(rbd_dev, &cid);
3708         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3709 }
3710
3711 /*
3712  * lock_rwsem must be held for write
3713  */
3714 static int rbd_lock(struct rbd_device *rbd_dev)
3715 {
3716         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3717         char cookie[32];
3718         int ret;
3719
3720         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3721                 rbd_dev->lock_cookie[0] != '\0');
3722
3723         format_lock_cookie(rbd_dev, cookie);
3724         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3725                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3726                             RBD_LOCK_TAG, "", 0);
3727         if (ret)
3728                 return ret;
3729
3730         __rbd_lock(rbd_dev, cookie);
3731         return 0;
3732 }
3733
3734 /*
3735  * lock_rwsem must be held for write
3736  */
3737 static void rbd_unlock(struct rbd_device *rbd_dev)
3738 {
3739         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3740         int ret;
3741
3742         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3743                 rbd_dev->lock_cookie[0] == '\0');
3744
3745         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3746                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3747         if (ret && ret != -ENOENT)
3748                 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3749
3750         /* treat errors as the image is unlocked */
3751         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3752         rbd_dev->lock_cookie[0] = '\0';
3753         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3754         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3755 }
3756
3757 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3758                                 enum rbd_notify_op notify_op,
3759                                 struct page ***preply_pages,
3760                                 size_t *preply_len)
3761 {
3762         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3763         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3764         char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3765         int buf_size = sizeof(buf);
3766         void *p = buf;
3767
3768         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3769
3770         /* encode *LockPayload NotifyMessage (op + ClientId) */
3771         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3772         ceph_encode_32(&p, notify_op);
3773         ceph_encode_64(&p, cid.gid);
3774         ceph_encode_64(&p, cid.handle);
3775
3776         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3777                                 &rbd_dev->header_oloc, buf, buf_size,
3778                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3779 }
3780
3781 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3782                                enum rbd_notify_op notify_op)
3783 {
3784         struct page **reply_pages;
3785         size_t reply_len;
3786
3787         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3788         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3789 }
3790
3791 static void rbd_notify_acquired_lock(struct work_struct *work)
3792 {
3793         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3794                                                   acquired_lock_work);
3795
3796         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3797 }
3798
3799 static void rbd_notify_released_lock(struct work_struct *work)
3800 {
3801         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3802                                                   released_lock_work);
3803
3804         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3805 }
3806
3807 static int rbd_request_lock(struct rbd_device *rbd_dev)
3808 {
3809         struct page **reply_pages;
3810         size_t reply_len;
3811         bool lock_owner_responded = false;
3812         int ret;
3813
3814         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3815
3816         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3817                                    &reply_pages, &reply_len);
3818         if (ret && ret != -ETIMEDOUT) {
3819                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3820                 goto out;
3821         }
3822
3823         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3824                 void *p = page_address(reply_pages[0]);
3825                 void *const end = p + reply_len;
3826                 u32 n;
3827
3828                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3829                 while (n--) {
3830                         u8 struct_v;
3831                         u32 len;
3832
3833                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3834                         p += 8 + 8; /* skip gid and cookie */
3835
3836                         ceph_decode_32_safe(&p, end, len, e_inval);
3837                         if (!len)
3838                                 continue;
3839
3840                         if (lock_owner_responded) {
3841                                 rbd_warn(rbd_dev,
3842                                          "duplicate lock owners detected");
3843                                 ret = -EIO;
3844                                 goto out;
3845                         }
3846
3847                         lock_owner_responded = true;
3848                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3849                                                   &struct_v, &len);
3850                         if (ret) {
3851                                 rbd_warn(rbd_dev,
3852                                          "failed to decode ResponseMessage: %d",
3853                                          ret);
3854                                 goto e_inval;
3855                         }
3856
3857                         ret = ceph_decode_32(&p);
3858                 }
3859         }
3860
3861         if (!lock_owner_responded) {
3862                 rbd_warn(rbd_dev, "no lock owners detected");
3863                 ret = -ETIMEDOUT;
3864         }
3865
3866 out:
3867         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3868         return ret;
3869
3870 e_inval:
3871         ret = -EINVAL;
3872         goto out;
3873 }
3874
3875 /*
3876  * Either image request state machine(s) or rbd_add_acquire_lock()
3877  * (i.e. "rbd map").
3878  */
3879 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3880 {
3881         struct rbd_img_request *img_req;
3882
3883         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3884         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3885
3886         cancel_delayed_work(&rbd_dev->lock_dwork);
3887         if (!completion_done(&rbd_dev->acquire_wait)) {
3888                 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3889                            list_empty(&rbd_dev->running_list));
3890                 rbd_dev->acquire_err = result;
3891                 complete_all(&rbd_dev->acquire_wait);
3892                 return;
3893         }
3894
3895         list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3896                 mutex_lock(&img_req->state_mutex);
3897                 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3898                 rbd_img_schedule(img_req, result);
3899                 mutex_unlock(&img_req->state_mutex);
3900         }
3901
3902         list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3903 }
3904
3905 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3906                                struct ceph_locker **lockers, u32 *num_lockers)
3907 {
3908         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3909         u8 lock_type;
3910         char *lock_tag;
3911         int ret;
3912
3913         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3914
3915         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3916                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
3917                                  &lock_type, &lock_tag, lockers, num_lockers);
3918         if (ret)
3919                 return ret;
3920
3921         if (*num_lockers == 0) {
3922                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3923                 goto out;
3924         }
3925
3926         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3927                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3928                          lock_tag);
3929                 ret = -EBUSY;
3930                 goto out;
3931         }
3932
3933         if (lock_type == CEPH_CLS_LOCK_SHARED) {
3934                 rbd_warn(rbd_dev, "shared lock type detected");
3935                 ret = -EBUSY;
3936                 goto out;
3937         }
3938
3939         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3940                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
3941                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3942                          (*lockers)[0].id.cookie);
3943                 ret = -EBUSY;
3944                 goto out;
3945         }
3946
3947 out:
3948         kfree(lock_tag);
3949         return ret;
3950 }
3951
3952 static int find_watcher(struct rbd_device *rbd_dev,
3953                         const struct ceph_locker *locker)
3954 {
3955         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3956         struct ceph_watch_item *watchers;
3957         u32 num_watchers;
3958         u64 cookie;
3959         int i;
3960         int ret;
3961
3962         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3963                                       &rbd_dev->header_oloc, &watchers,
3964                                       &num_watchers);
3965         if (ret)
3966                 return ret;
3967
3968         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3969         for (i = 0; i < num_watchers; i++) {
3970                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3971                             sizeof(locker->info.addr)) &&
3972                     watchers[i].cookie == cookie) {
3973                         struct rbd_client_id cid = {
3974                                 .gid = le64_to_cpu(watchers[i].name.num),
3975                                 .handle = cookie,
3976                         };
3977
3978                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3979                              rbd_dev, cid.gid, cid.handle);
3980                         rbd_set_owner_cid(rbd_dev, &cid);
3981                         ret = 1;
3982                         goto out;
3983                 }
3984         }
3985
3986         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3987         ret = 0;
3988 out:
3989         kfree(watchers);
3990         return ret;
3991 }
3992
3993 /*
3994  * lock_rwsem must be held for write
3995  */
3996 static int rbd_try_lock(struct rbd_device *rbd_dev)
3997 {
3998         struct ceph_client *client = rbd_dev->rbd_client->client;
3999         struct ceph_locker *lockers;
4000         u32 num_lockers;
4001         int ret;
4002
4003         for (;;) {
4004                 ret = rbd_lock(rbd_dev);
4005                 if (ret != -EBUSY)
4006                         return ret;
4007
4008                 /* determine if the current lock holder is still alive */
4009                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
4010                 if (ret)
4011                         return ret;
4012
4013                 if (num_lockers == 0)
4014                         goto again;
4015
4016                 ret = find_watcher(rbd_dev, lockers);
4017                 if (ret)
4018                         goto out; /* request lock or error */
4019
4020                 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4021                          ENTITY_NAME(lockers[0].id.name));
4022
4023                 ret = ceph_monc_blacklist_add(&client->monc,
4024                                               &lockers[0].info.addr);
4025                 if (ret) {
4026                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
4027                                  ENTITY_NAME(lockers[0].id.name), ret);
4028                         goto out;
4029                 }
4030
4031                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4032                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
4033                                           lockers[0].id.cookie,
4034                                           &lockers[0].id.name);
4035                 if (ret && ret != -ENOENT)
4036                         goto out;
4037
4038 again:
4039                 ceph_free_lockers(lockers, num_lockers);
4040         }
4041
4042 out:
4043         ceph_free_lockers(lockers, num_lockers);
4044         return ret;
4045 }
4046
4047 static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4048 {
4049         int ret;
4050
4051         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4052                 ret = rbd_object_map_open(rbd_dev);
4053                 if (ret)
4054                         return ret;
4055         }
4056
4057         return 0;
4058 }
4059
4060 /*
4061  * Return:
4062  *   0 - lock acquired
4063  *   1 - caller should call rbd_request_lock()
4064  *  <0 - error
4065  */
4066 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4067 {
4068         int ret;
4069
4070         down_read(&rbd_dev->lock_rwsem);
4071         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4072              rbd_dev->lock_state);
4073         if (__rbd_is_lock_owner(rbd_dev)) {
4074                 up_read(&rbd_dev->lock_rwsem);
4075                 return 0;
4076         }
4077
4078         up_read(&rbd_dev->lock_rwsem);
4079         down_write(&rbd_dev->lock_rwsem);
4080         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4081              rbd_dev->lock_state);
4082         if (__rbd_is_lock_owner(rbd_dev)) {
4083                 up_write(&rbd_dev->lock_rwsem);
4084                 return 0;
4085         }
4086
4087         ret = rbd_try_lock(rbd_dev);
4088         if (ret < 0) {
4089                 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4090                 if (ret == -EBLACKLISTED)
4091                         goto out;
4092
4093                 ret = 1; /* request lock anyway */
4094         }
4095         if (ret > 0) {
4096                 up_write(&rbd_dev->lock_rwsem);
4097                 return ret;
4098         }
4099
4100         rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4101         rbd_assert(list_empty(&rbd_dev->running_list));
4102
4103         ret = rbd_post_acquire_action(rbd_dev);
4104         if (ret) {
4105                 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4106                 /*
4107                  * Can't stay in RBD_LOCK_STATE_LOCKED because
4108                  * rbd_lock_add_request() would let the request through,
4109                  * assuming that e.g. object map is locked and loaded.
4110                  */
4111                 rbd_unlock(rbd_dev);
4112         }
4113
4114 out:
4115         wake_lock_waiters(rbd_dev, ret);
4116         up_write(&rbd_dev->lock_rwsem);
4117         return ret;
4118 }
4119
4120 static void rbd_acquire_lock(struct work_struct *work)
4121 {
4122         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4123                                             struct rbd_device, lock_dwork);
4124         int ret;
4125
4126         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4127 again:
4128         ret = rbd_try_acquire_lock(rbd_dev);
4129         if (ret <= 0) {
4130                 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4131                 return;
4132         }
4133
4134         ret = rbd_request_lock(rbd_dev);
4135         if (ret == -ETIMEDOUT) {
4136                 goto again; /* treat this as a dead client */
4137         } else if (ret == -EROFS) {
4138                 rbd_warn(rbd_dev, "peer will not release lock");
4139                 down_write(&rbd_dev->lock_rwsem);
4140                 wake_lock_waiters(rbd_dev, ret);
4141                 up_write(&rbd_dev->lock_rwsem);
4142         } else if (ret < 0) {
4143                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4144                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4145                                  RBD_RETRY_DELAY);
4146         } else {
4147                 /*
4148                  * lock owner acked, but resend if we don't see them
4149                  * release the lock
4150                  */
4151                 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4152                      rbd_dev);
4153                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4154                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4155         }
4156 }
4157
4158 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4159 {
4160         bool need_wait;
4161
4162         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4163         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4164
4165         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4166                 return false;
4167
4168         /*
4169          * Ensure that all in-flight IO is flushed.
4170          */
4171         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4172         rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4173         need_wait = !list_empty(&rbd_dev->running_list);
4174         downgrade_write(&rbd_dev->lock_rwsem);
4175         if (need_wait)
4176                 wait_for_completion(&rbd_dev->releasing_wait);
4177         up_read(&rbd_dev->lock_rwsem);
4178
4179         down_write(&rbd_dev->lock_rwsem);
4180         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4181                 return false;
4182
4183         rbd_assert(list_empty(&rbd_dev->running_list));
4184         return true;
4185 }
4186
4187 static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4188 {
4189         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4190                 rbd_object_map_close(rbd_dev);
4191 }
4192
4193 static void __rbd_release_lock(struct rbd_device *rbd_dev)
4194 {
4195         rbd_assert(list_empty(&rbd_dev->running_list));
4196
4197         rbd_pre_release_action(rbd_dev);
4198         rbd_unlock(rbd_dev);
4199 }
4200
4201 /*
4202  * lock_rwsem must be held for write
4203  */
4204 static void rbd_release_lock(struct rbd_device *rbd_dev)
4205 {
4206         if (!rbd_quiesce_lock(rbd_dev))
4207                 return;
4208
4209         __rbd_release_lock(rbd_dev);
4210
4211         /*
4212          * Give others a chance to grab the lock - we would re-acquire
4213          * almost immediately if we got new IO while draining the running
4214          * list otherwise.  We need to ack our own notifications, so this
4215          * lock_dwork will be requeued from rbd_handle_released_lock() by
4216          * way of maybe_kick_acquire().
4217          */
4218         cancel_delayed_work(&rbd_dev->lock_dwork);
4219 }
4220
4221 static void rbd_release_lock_work(struct work_struct *work)
4222 {
4223         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4224                                                   unlock_work);
4225
4226         down_write(&rbd_dev->lock_rwsem);
4227         rbd_release_lock(rbd_dev);
4228         up_write(&rbd_dev->lock_rwsem);
4229 }
4230
4231 static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4232 {
4233         bool have_requests;
4234
4235         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4236         if (__rbd_is_lock_owner(rbd_dev))
4237                 return;
4238
4239         spin_lock(&rbd_dev->lock_lists_lock);
4240         have_requests = !list_empty(&rbd_dev->acquiring_list);
4241         spin_unlock(&rbd_dev->lock_lists_lock);
4242         if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4243                 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4244                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4245         }
4246 }
4247
4248 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4249                                      void **p)
4250 {
4251         struct rbd_client_id cid = { 0 };
4252
4253         if (struct_v >= 2) {
4254                 cid.gid = ceph_decode_64(p);
4255                 cid.handle = ceph_decode_64(p);
4256         }
4257
4258         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4259              cid.handle);
4260         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4261                 down_write(&rbd_dev->lock_rwsem);
4262                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4263                         /*
4264                          * we already know that the remote client is
4265                          * the owner
4266                          */
4267                         up_write(&rbd_dev->lock_rwsem);
4268                         return;
4269                 }
4270
4271                 rbd_set_owner_cid(rbd_dev, &cid);
4272                 downgrade_write(&rbd_dev->lock_rwsem);
4273         } else {
4274                 down_read(&rbd_dev->lock_rwsem);
4275         }
4276
4277         maybe_kick_acquire(rbd_dev);
4278         up_read(&rbd_dev->lock_rwsem);
4279 }
4280
4281 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4282                                      void **p)
4283 {
4284         struct rbd_client_id cid = { 0 };
4285
4286         if (struct_v >= 2) {
4287                 cid.gid = ceph_decode_64(p);
4288                 cid.handle = ceph_decode_64(p);
4289         }
4290
4291         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4292              cid.handle);
4293         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4294                 down_write(&rbd_dev->lock_rwsem);
4295                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4296                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
4297                              __func__, rbd_dev, cid.gid, cid.handle,
4298                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4299                         up_write(&rbd_dev->lock_rwsem);
4300                         return;
4301                 }
4302
4303                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4304                 downgrade_write(&rbd_dev->lock_rwsem);
4305         } else {
4306                 down_read(&rbd_dev->lock_rwsem);
4307         }
4308
4309         maybe_kick_acquire(rbd_dev);
4310         up_read(&rbd_dev->lock_rwsem);
4311 }
4312
4313 /*
4314  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4315  * ResponseMessage is needed.
4316  */
4317 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4318                                    void **p)
4319 {
4320         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4321         struct rbd_client_id cid = { 0 };
4322         int result = 1;
4323
4324         if (struct_v >= 2) {
4325                 cid.gid = ceph_decode_64(p);
4326                 cid.handle = ceph_decode_64(p);
4327         }
4328
4329         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4330              cid.handle);
4331         if (rbd_cid_equal(&cid, &my_cid))
4332                 return result;
4333
4334         down_read(&rbd_dev->lock_rwsem);
4335         if (__rbd_is_lock_owner(rbd_dev)) {
4336                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4337                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4338                         goto out_unlock;
4339
4340                 /*
4341                  * encode ResponseMessage(0) so the peer can detect
4342                  * a missing owner
4343                  */
4344                 result = 0;
4345
4346                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4347                         if (!rbd_dev->opts->exclusive) {
4348                                 dout("%s rbd_dev %p queueing unlock_work\n",
4349                                      __func__, rbd_dev);
4350                                 queue_work(rbd_dev->task_wq,
4351                                            &rbd_dev->unlock_work);
4352                         } else {
4353                                 /* refuse to release the lock */
4354                                 result = -EROFS;
4355                         }
4356                 }
4357         }
4358
4359 out_unlock:
4360         up_read(&rbd_dev->lock_rwsem);
4361         return result;
4362 }
4363
4364 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4365                                      u64 notify_id, u64 cookie, s32 *result)
4366 {
4367         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4368         char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4369         int buf_size = sizeof(buf);
4370         int ret;
4371
4372         if (result) {
4373                 void *p = buf;
4374
4375                 /* encode ResponseMessage */
4376                 ceph_start_encoding(&p, 1, 1,
4377                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
4378                 ceph_encode_32(&p, *result);
4379         } else {
4380                 buf_size = 0;
4381         }
4382
4383         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4384                                    &rbd_dev->header_oloc, notify_id, cookie,
4385                                    buf, buf_size);
4386         if (ret)
4387                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4388 }
4389
4390 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4391                                    u64 cookie)
4392 {
4393         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4394         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4395 }
4396
4397 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4398                                           u64 notify_id, u64 cookie, s32 result)
4399 {
4400         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4401         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4402 }
4403
4404 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4405                          u64 notifier_id, void *data, size_t data_len)
4406 {
4407         struct rbd_device *rbd_dev = arg;
4408         void *p = data;
4409         void *const end = p + data_len;
4410         u8 struct_v = 0;
4411         u32 len;
4412         u32 notify_op;
4413         int ret;
4414
4415         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4416              __func__, rbd_dev, cookie, notify_id, data_len);
4417         if (data_len) {
4418                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4419                                           &struct_v, &len);
4420                 if (ret) {
4421                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4422                                  ret);
4423                         return;
4424                 }
4425
4426                 notify_op = ceph_decode_32(&p);
4427         } else {
4428                 /* legacy notification for header updates */
4429                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4430                 len = 0;
4431         }
4432
4433         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4434         switch (notify_op) {
4435         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4436                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4437                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4438                 break;
4439         case RBD_NOTIFY_OP_RELEASED_LOCK:
4440                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4441                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4442                 break;
4443         case RBD_NOTIFY_OP_REQUEST_LOCK:
4444                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4445                 if (ret <= 0)
4446                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4447                                                       cookie, ret);
4448                 else
4449                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4450                 break;
4451         case RBD_NOTIFY_OP_HEADER_UPDATE:
4452                 ret = rbd_dev_refresh(rbd_dev);
4453                 if (ret)
4454                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
4455
4456                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4457                 break;
4458         default:
4459                 if (rbd_is_lock_owner(rbd_dev))
4460                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4461                                                       cookie, -EOPNOTSUPP);
4462                 else
4463                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4464                 break;
4465         }
4466 }
4467
4468 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4469
4470 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4471 {
4472         struct rbd_device *rbd_dev = arg;
4473
4474         rbd_warn(rbd_dev, "encountered watch error: %d", err);
4475
4476         down_write(&rbd_dev->lock_rwsem);
4477         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4478         up_write(&rbd_dev->lock_rwsem);
4479
4480         mutex_lock(&rbd_dev->watch_mutex);
4481         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4482                 __rbd_unregister_watch(rbd_dev);
4483                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4484
4485                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4486         }
4487         mutex_unlock(&rbd_dev->watch_mutex);
4488 }
4489
4490 /*
4491  * watch_mutex must be locked
4492  */
4493 static int __rbd_register_watch(struct rbd_device *rbd_dev)
4494 {
4495         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4496         struct ceph_osd_linger_request *handle;
4497
4498         rbd_assert(!rbd_dev->watch_handle);
4499         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4500
4501         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4502                                  &rbd_dev->header_oloc, rbd_watch_cb,
4503                                  rbd_watch_errcb, rbd_dev);
4504         if (IS_ERR(handle))
4505                 return PTR_ERR(handle);
4506
4507         rbd_dev->watch_handle = handle;
4508         return 0;
4509 }
4510
4511 /*
4512  * watch_mutex must be locked
4513  */
4514 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4515 {
4516         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4517         int ret;
4518
4519         rbd_assert(rbd_dev->watch_handle);
4520         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4521
4522         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4523         if (ret)
4524                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4525
4526         rbd_dev->watch_handle = NULL;
4527 }
4528
4529 static int rbd_register_watch(struct rbd_device *rbd_dev)
4530 {
4531         int ret;
4532
4533         mutex_lock(&rbd_dev->watch_mutex);
4534         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4535         ret = __rbd_register_watch(rbd_dev);
4536         if (ret)
4537                 goto out;
4538
4539         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4540         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4541
4542 out:
4543         mutex_unlock(&rbd_dev->watch_mutex);
4544         return ret;
4545 }
4546
4547 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4548 {
4549         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4550
4551         cancel_work_sync(&rbd_dev->acquired_lock_work);
4552         cancel_work_sync(&rbd_dev->released_lock_work);
4553         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4554         cancel_work_sync(&rbd_dev->unlock_work);
4555 }
4556
4557 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4558 {
4559         cancel_tasks_sync(rbd_dev);
4560
4561         mutex_lock(&rbd_dev->watch_mutex);
4562         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4563                 __rbd_unregister_watch(rbd_dev);
4564         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4565         mutex_unlock(&rbd_dev->watch_mutex);
4566
4567         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4568         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4569 }
4570
4571 /*
4572  * lock_rwsem must be held for write
4573  */
4574 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4575 {
4576         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4577         char cookie[32];
4578         int ret;
4579
4580         if (!rbd_quiesce_lock(rbd_dev))
4581                 return;
4582
4583         format_lock_cookie(rbd_dev, cookie);
4584         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4585                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
4586                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4587                                   RBD_LOCK_TAG, cookie);
4588         if (ret) {
4589                 if (ret != -EOPNOTSUPP)
4590                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4591                                  ret);
4592
4593                 /*
4594                  * Lock cookie cannot be updated on older OSDs, so do
4595                  * a manual release and queue an acquire.
4596                  */
4597                 __rbd_release_lock(rbd_dev);
4598                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4599         } else {
4600                 __rbd_lock(rbd_dev, cookie);
4601                 wake_lock_waiters(rbd_dev, 0);
4602         }
4603 }
4604
4605 static void rbd_reregister_watch(struct work_struct *work)
4606 {
4607         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4608                                             struct rbd_device, watch_dwork);
4609         int ret;
4610
4611         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4612
4613         mutex_lock(&rbd_dev->watch_mutex);
4614         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4615                 mutex_unlock(&rbd_dev->watch_mutex);
4616                 return;
4617         }
4618
4619         ret = __rbd_register_watch(rbd_dev);
4620         if (ret) {
4621                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4622                 if (ret != -EBLACKLISTED && ret != -ENOENT) {
4623                         queue_delayed_work(rbd_dev->task_wq,
4624                                            &rbd_dev->watch_dwork,
4625                                            RBD_RETRY_DELAY);
4626                         mutex_unlock(&rbd_dev->watch_mutex);
4627                         return;
4628                 }
4629
4630                 mutex_unlock(&rbd_dev->watch_mutex);
4631                 down_write(&rbd_dev->lock_rwsem);
4632                 wake_lock_waiters(rbd_dev, ret);
4633                 up_write(&rbd_dev->lock_rwsem);
4634                 return;
4635         }
4636
4637         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4638         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4639         mutex_unlock(&rbd_dev->watch_mutex);
4640
4641         down_write(&rbd_dev->lock_rwsem);
4642         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4643                 rbd_reacquire_lock(rbd_dev);
4644         up_write(&rbd_dev->lock_rwsem);
4645
4646         ret = rbd_dev_refresh(rbd_dev);
4647         if (ret)
4648                 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4649 }
4650
4651 /*
4652  * Synchronous osd object method call.  Returns the number of bytes
4653  * returned in the outbound buffer, or a negative error code.
4654  */
4655 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4656                              struct ceph_object_id *oid,
4657                              struct ceph_object_locator *oloc,
4658                              const char *method_name,
4659                              const void *outbound,
4660                              size_t outbound_size,
4661                              void *inbound,
4662                              size_t inbound_size)
4663 {
4664         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4665         struct page *req_page = NULL;
4666         struct page *reply_page;
4667         int ret;
4668
4669         /*
4670          * Method calls are ultimately read operations.  The result
4671          * should placed into the inbound buffer provided.  They
4672          * also supply outbound data--parameters for the object
4673          * method.  Currently if this is present it will be a
4674          * snapshot id.
4675          */
4676         if (outbound) {
4677                 if (outbound_size > PAGE_SIZE)
4678                         return -E2BIG;
4679
4680                 req_page = alloc_page(GFP_KERNEL);
4681                 if (!req_page)
4682                         return -ENOMEM;
4683
4684                 memcpy(page_address(req_page), outbound, outbound_size);
4685         }
4686
4687         reply_page = alloc_page(GFP_KERNEL);
4688         if (!reply_page) {
4689                 if (req_page)
4690                         __free_page(req_page);
4691                 return -ENOMEM;
4692         }
4693
4694         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4695                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
4696                              &reply_page, &inbound_size);
4697         if (!ret) {
4698                 memcpy(inbound, page_address(reply_page), inbound_size);
4699                 ret = inbound_size;
4700         }
4701
4702         if (req_page)
4703                 __free_page(req_page);
4704         __free_page(reply_page);
4705         return ret;
4706 }
4707
4708 static void rbd_queue_workfn(struct work_struct *work)
4709 {
4710         struct request *rq = blk_mq_rq_from_pdu(work);
4711         struct rbd_device *rbd_dev = rq->q->queuedata;
4712         struct rbd_img_request *img_request;
4713         struct ceph_snap_context *snapc = NULL;
4714         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4715         u64 length = blk_rq_bytes(rq);
4716         enum obj_operation_type op_type;
4717         u64 mapping_size;
4718         int result;
4719
4720         switch (req_op(rq)) {
4721         case REQ_OP_DISCARD:
4722                 op_type = OBJ_OP_DISCARD;
4723                 break;
4724         case REQ_OP_WRITE_ZEROES:
4725                 op_type = OBJ_OP_ZEROOUT;
4726                 break;
4727         case REQ_OP_WRITE:
4728                 op_type = OBJ_OP_WRITE;
4729                 break;
4730         case REQ_OP_READ:
4731                 op_type = OBJ_OP_READ;
4732                 break;
4733         default:
4734                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4735                 result = -EIO;
4736                 goto err;
4737         }
4738
4739         /* Ignore/skip any zero-length requests */
4740
4741         if (!length) {
4742                 dout("%s: zero-length request\n", __func__);
4743                 result = 0;
4744                 goto err_rq;
4745         }
4746
4747         if (op_type != OBJ_OP_READ) {
4748                 if (rbd_is_ro(rbd_dev)) {
4749                         rbd_warn(rbd_dev, "%s on read-only mapping",
4750                                  obj_op_name(op_type));
4751                         result = -EIO;
4752                         goto err;
4753                 }
4754                 rbd_assert(!rbd_is_snap(rbd_dev));
4755         }
4756
4757         if (offset && length > U64_MAX - offset + 1) {
4758                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4759                          length);
4760                 result = -EINVAL;
4761                 goto err_rq;    /* Shouldn't happen */
4762         }
4763
4764         blk_mq_start_request(rq);
4765
4766         down_read(&rbd_dev->header_rwsem);
4767         mapping_size = rbd_dev->mapping.size;
4768         if (op_type != OBJ_OP_READ) {
4769                 snapc = rbd_dev->header.snapc;
4770                 ceph_get_snap_context(snapc);
4771         }
4772         up_read(&rbd_dev->header_rwsem);
4773
4774         if (offset + length > mapping_size) {
4775                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4776                          length, mapping_size);
4777                 result = -EIO;
4778                 goto err_rq;
4779         }
4780
4781         img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
4782         if (!img_request) {
4783                 result = -ENOMEM;
4784                 goto err_rq;
4785         }
4786         img_request->rq = rq;
4787         snapc = NULL; /* img_request consumes a ref */
4788
4789         dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4790              img_request, obj_op_name(op_type), offset, length);
4791
4792         if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4793                 result = rbd_img_fill_nodata(img_request, offset, length);
4794         else
4795                 result = rbd_img_fill_from_bio(img_request, offset, length,
4796                                                rq->bio);
4797         if (result)
4798                 goto err_img_request;
4799
4800         rbd_img_handle_request(img_request, 0);
4801         return;
4802
4803 err_img_request:
4804         rbd_img_request_put(img_request);
4805 err_rq:
4806         if (result)
4807                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4808                          obj_op_name(op_type), length, offset, result);
4809         ceph_put_snap_context(snapc);
4810 err:
4811         blk_mq_end_request(rq, errno_to_blk_status(result));
4812 }
4813
4814 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4815                 const struct blk_mq_queue_data *bd)
4816 {
4817         struct request *rq = bd->rq;
4818         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4819
4820         queue_work(rbd_wq, work);
4821         return BLK_STS_OK;
4822 }
4823
4824 static void rbd_free_disk(struct rbd_device *rbd_dev)
4825 {
4826         blk_cleanup_queue(rbd_dev->disk->queue);
4827         blk_mq_free_tag_set(&rbd_dev->tag_set);
4828         put_disk(rbd_dev->disk);
4829         rbd_dev->disk = NULL;
4830 }
4831
4832 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4833                              struct ceph_object_id *oid,
4834                              struct ceph_object_locator *oloc,
4835                              void *buf, int buf_len)
4836
4837 {
4838         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4839         struct ceph_osd_request *req;
4840         struct page **pages;
4841         int num_pages = calc_pages_for(0, buf_len);
4842         int ret;
4843
4844         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4845         if (!req)
4846                 return -ENOMEM;
4847
4848         ceph_oid_copy(&req->r_base_oid, oid);
4849         ceph_oloc_copy(&req->r_base_oloc, oloc);
4850         req->r_flags = CEPH_OSD_FLAG_READ;
4851
4852         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4853         if (IS_ERR(pages)) {
4854                 ret = PTR_ERR(pages);
4855                 goto out_req;
4856         }
4857
4858         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4859         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4860                                          true);
4861
4862         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4863         if (ret)
4864                 goto out_req;
4865
4866         ceph_osdc_start_request(osdc, req, false);
4867         ret = ceph_osdc_wait_request(osdc, req);
4868         if (ret >= 0)
4869                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4870
4871 out_req:
4872         ceph_osdc_put_request(req);
4873         return ret;
4874 }
4875
4876 /*
4877  * Read the complete header for the given rbd device.  On successful
4878  * return, the rbd_dev->header field will contain up-to-date
4879  * information about the image.
4880  */
4881 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4882 {
4883         struct rbd_image_header_ondisk *ondisk = NULL;
4884         u32 snap_count = 0;
4885         u64 names_size = 0;
4886         u32 want_count;
4887         int ret;
4888
4889         /*
4890          * The complete header will include an array of its 64-bit
4891          * snapshot ids, followed by the names of those snapshots as
4892          * a contiguous block of NUL-terminated strings.  Note that
4893          * the number of snapshots could change by the time we read
4894          * it in, in which case we re-read it.
4895          */
4896         do {
4897                 size_t size;
4898
4899                 kfree(ondisk);
4900
4901                 size = sizeof (*ondisk);
4902                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4903                 size += names_size;
4904                 ondisk = kmalloc(size, GFP_KERNEL);
4905                 if (!ondisk)
4906                         return -ENOMEM;
4907
4908                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4909                                         &rbd_dev->header_oloc, ondisk, size);
4910                 if (ret < 0)
4911                         goto out;
4912                 if ((size_t)ret < size) {
4913                         ret = -ENXIO;
4914                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4915                                 size, ret);
4916                         goto out;
4917                 }
4918                 if (!rbd_dev_ondisk_valid(ondisk)) {
4919                         ret = -ENXIO;
4920                         rbd_warn(rbd_dev, "invalid header");
4921                         goto out;
4922                 }
4923
4924                 names_size = le64_to_cpu(ondisk->snap_names_len);
4925                 want_count = snap_count;
4926                 snap_count = le32_to_cpu(ondisk->snap_count);
4927         } while (snap_count != want_count);
4928
4929         ret = rbd_header_from_disk(rbd_dev, ondisk);
4930 out:
4931         kfree(ondisk);
4932
4933         return ret;
4934 }
4935
4936 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4937 {
4938         sector_t size;
4939
4940         /*
4941          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4942          * try to update its size.  If REMOVING is set, updating size
4943          * is just useless work since the device can't be opened.
4944          */
4945         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4946             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
4947                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4948                 dout("setting size to %llu sectors", (unsigned long long)size);
4949                 set_capacity(rbd_dev->disk, size);
4950                 revalidate_disk(rbd_dev->disk);
4951         }
4952 }
4953
4954 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
4955 {
4956         u64 mapping_size;
4957         int ret;
4958
4959         down_write(&rbd_dev->header_rwsem);
4960         mapping_size = rbd_dev->mapping.size;
4961
4962         ret = rbd_dev_header_info(rbd_dev);
4963         if (ret)
4964                 goto out;
4965
4966         /*
4967          * If there is a parent, see if it has disappeared due to the
4968          * mapped image getting flattened.
4969          */
4970         if (rbd_dev->parent) {
4971                 ret = rbd_dev_v2_parent_info(rbd_dev);
4972                 if (ret)
4973                         goto out;
4974         }
4975
4976         rbd_assert(!rbd_is_snap(rbd_dev));
4977         rbd_dev->mapping.size = rbd_dev->header.image_size;
4978
4979 out:
4980         up_write(&rbd_dev->header_rwsem);
4981         if (!ret && mapping_size != rbd_dev->mapping.size)
4982                 rbd_dev_update_size(rbd_dev);
4983
4984         return ret;
4985 }
4986
4987 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4988                 unsigned int hctx_idx, unsigned int numa_node)
4989 {
4990         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4991
4992         INIT_WORK(work, rbd_queue_workfn);
4993         return 0;
4994 }
4995
4996 static const struct blk_mq_ops rbd_mq_ops = {
4997         .queue_rq       = rbd_queue_rq,
4998         .init_request   = rbd_init_request,
4999 };
5000
5001 static int rbd_init_disk(struct rbd_device *rbd_dev)
5002 {
5003         struct gendisk *disk;
5004         struct request_queue *q;
5005         unsigned int objset_bytes =
5006             rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
5007         int err;
5008
5009         /* create gendisk info */
5010         disk = alloc_disk(single_major ?
5011                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
5012                           RBD_MINORS_PER_MAJOR);
5013         if (!disk)
5014                 return -ENOMEM;
5015
5016         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
5017                  rbd_dev->dev_id);
5018         disk->major = rbd_dev->major;
5019         disk->first_minor = rbd_dev->minor;
5020         if (single_major)
5021                 disk->flags |= GENHD_FL_EXT_DEVT;
5022         disk->fops = &rbd_bd_ops;
5023         disk->private_data = rbd_dev;
5024
5025         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
5026         rbd_dev->tag_set.ops = &rbd_mq_ops;
5027         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
5028         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
5029         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
5030         rbd_dev->tag_set.nr_hw_queues = 1;
5031         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
5032
5033         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
5034         if (err)
5035                 goto out_disk;
5036
5037         q = blk_mq_init_queue(&rbd_dev->tag_set);
5038         if (IS_ERR(q)) {
5039                 err = PTR_ERR(q);
5040                 goto out_tag_set;
5041         }
5042
5043         blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5044         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
5045
5046         blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
5047         q->limits.max_sectors = queue_max_hw_sectors(q);
5048         blk_queue_max_segments(q, USHRT_MAX);
5049         blk_queue_max_segment_size(q, UINT_MAX);
5050         blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5051         blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
5052
5053         if (rbd_dev->opts->trim) {
5054                 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
5055                 q->limits.discard_granularity = rbd_dev->opts->alloc_size;
5056                 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
5057                 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
5058         }
5059
5060         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5061                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
5062
5063         /*
5064          * disk_release() expects a queue ref from add_disk() and will
5065          * put it.  Hold an extra ref until add_disk() is called.
5066          */
5067         WARN_ON(!blk_get_queue(q));
5068         disk->queue = q;
5069         q->queuedata = rbd_dev;
5070
5071         rbd_dev->disk = disk;
5072
5073         return 0;
5074 out_tag_set:
5075         blk_mq_free_tag_set(&rbd_dev->tag_set);
5076 out_disk:
5077         put_disk(disk);
5078         return err;
5079 }
5080
5081 /*
5082   sysfs
5083 */
5084
5085 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5086 {
5087         return container_of(dev, struct rbd_device, dev);
5088 }
5089
5090 static ssize_t rbd_size_show(struct device *dev,
5091                              struct device_attribute *attr, char *buf)
5092 {
5093         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5094
5095         return sprintf(buf, "%llu\n",
5096                 (unsigned long long)rbd_dev->mapping.size);
5097 }
5098
5099 static ssize_t rbd_features_show(struct device *dev,
5100                              struct device_attribute *attr, char *buf)
5101 {
5102         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5103
5104         return sprintf(buf, "0x%016llx\n", rbd_dev->header.features);
5105 }
5106
5107 static ssize_t rbd_major_show(struct device *dev,
5108                               struct device_attribute *attr, char *buf)
5109 {
5110         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5111
5112         if (rbd_dev->major)
5113                 return sprintf(buf, "%d\n", rbd_dev->major);
5114
5115         return sprintf(buf, "(none)\n");
5116 }
5117
5118 static ssize_t rbd_minor_show(struct device *dev,
5119                               struct device_attribute *attr, char *buf)
5120 {
5121         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5122
5123         return sprintf(buf, "%d\n", rbd_dev->minor);
5124 }
5125
5126 static ssize_t rbd_client_addr_show(struct device *dev,
5127                                     struct device_attribute *attr, char *buf)
5128 {
5129         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5130         struct ceph_entity_addr *client_addr =
5131             ceph_client_addr(rbd_dev->rbd_client->client);
5132
5133         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5134                        le32_to_cpu(client_addr->nonce));
5135 }
5136
5137 static ssize_t rbd_client_id_show(struct device *dev,
5138                                   struct device_attribute *attr, char *buf)
5139 {
5140         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5141
5142         return sprintf(buf, "client%lld\n",
5143                        ceph_client_gid(rbd_dev->rbd_client->client));
5144 }
5145
5146 static ssize_t rbd_cluster_fsid_show(struct device *dev,
5147                                      struct device_attribute *attr, char *buf)
5148 {
5149         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5150
5151         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5152 }
5153
5154 static ssize_t rbd_config_info_show(struct device *dev,
5155                                     struct device_attribute *attr, char *buf)
5156 {
5157         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5158
5159         return sprintf(buf, "%s\n", rbd_dev->config_info);
5160 }
5161
5162 static ssize_t rbd_pool_show(struct device *dev,
5163                              struct device_attribute *attr, char *buf)
5164 {
5165         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5166
5167         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5168 }
5169
5170 static ssize_t rbd_pool_id_show(struct device *dev,
5171                              struct device_attribute *attr, char *buf)
5172 {
5173         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5174
5175         return sprintf(buf, "%llu\n",
5176                         (unsigned long long) rbd_dev->spec->pool_id);
5177 }
5178
5179 static ssize_t rbd_pool_ns_show(struct device *dev,
5180                                 struct device_attribute *attr, char *buf)
5181 {
5182         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5183
5184         return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5185 }
5186
5187 static ssize_t rbd_name_show(struct device *dev,
5188                              struct device_attribute *attr, char *buf)
5189 {
5190         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5191
5192         if (rbd_dev->spec->image_name)
5193                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5194
5195         return sprintf(buf, "(unknown)\n");
5196 }
5197
5198 static ssize_t rbd_image_id_show(struct device *dev,
5199                              struct device_attribute *attr, char *buf)
5200 {
5201         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5202
5203         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5204 }
5205
5206 /*
5207  * Shows the name of the currently-mapped snapshot (or
5208  * RBD_SNAP_HEAD_NAME for the base image).
5209  */
5210 static ssize_t rbd_snap_show(struct device *dev,
5211                              struct device_attribute *attr,
5212                              char *buf)
5213 {
5214         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5215
5216         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5217 }
5218
5219 static ssize_t rbd_snap_id_show(struct device *dev,
5220                                 struct device_attribute *attr, char *buf)
5221 {
5222         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5223
5224         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5225 }
5226
5227 /*
5228  * For a v2 image, shows the chain of parent images, separated by empty
5229  * lines.  For v1 images or if there is no parent, shows "(no parent
5230  * image)".
5231  */
5232 static ssize_t rbd_parent_show(struct device *dev,
5233                                struct device_attribute *attr,
5234                                char *buf)
5235 {
5236         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5237         ssize_t count = 0;
5238
5239         if (!rbd_dev->parent)
5240                 return sprintf(buf, "(no parent image)\n");
5241
5242         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5243                 struct rbd_spec *spec = rbd_dev->parent_spec;
5244
5245                 count += sprintf(&buf[count], "%s"
5246                             "pool_id %llu\npool_name %s\n"
5247                             "pool_ns %s\n"
5248                             "image_id %s\nimage_name %s\n"
5249                             "snap_id %llu\nsnap_name %s\n"
5250                             "overlap %llu\n",
5251                             !count ? "" : "\n", /* first? */
5252                             spec->pool_id, spec->pool_name,
5253                             spec->pool_ns ?: "",
5254                             spec->image_id, spec->image_name ?: "(unknown)",
5255                             spec->snap_id, spec->snap_name,
5256                             rbd_dev->parent_overlap);
5257         }
5258
5259         return count;
5260 }
5261
5262 static ssize_t rbd_image_refresh(struct device *dev,
5263                                  struct device_attribute *attr,
5264                                  const char *buf,
5265                                  size_t size)
5266 {
5267         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5268         int ret;
5269
5270         ret = rbd_dev_refresh(rbd_dev);
5271         if (ret)
5272                 return ret;
5273
5274         return size;
5275 }
5276
5277 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5278 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5279 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5280 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5281 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5282 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5283 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5284 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5285 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5286 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5287 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5288 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5289 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5290 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5291 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5292 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5293 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5294
5295 static struct attribute *rbd_attrs[] = {
5296         &dev_attr_size.attr,
5297         &dev_attr_features.attr,
5298         &dev_attr_major.attr,
5299         &dev_attr_minor.attr,
5300         &dev_attr_client_addr.attr,
5301         &dev_attr_client_id.attr,
5302         &dev_attr_cluster_fsid.attr,
5303         &dev_attr_config_info.attr,
5304         &dev_attr_pool.attr,
5305         &dev_attr_pool_id.attr,
5306         &dev_attr_pool_ns.attr,
5307         &dev_attr_name.attr,
5308         &dev_attr_image_id.attr,
5309         &dev_attr_current_snap.attr,
5310         &dev_attr_snap_id.attr,
5311         &dev_attr_parent.attr,
5312         &dev_attr_refresh.attr,
5313         NULL
5314 };
5315
5316 static struct attribute_group rbd_attr_group = {
5317         .attrs = rbd_attrs,
5318 };
5319
5320 static const struct attribute_group *rbd_attr_groups[] = {
5321         &rbd_attr_group,
5322         NULL
5323 };
5324
5325 static void rbd_dev_release(struct device *dev);
5326
5327 static const struct device_type rbd_device_type = {
5328         .name           = "rbd",
5329         .groups         = rbd_attr_groups,
5330         .release        = rbd_dev_release,
5331 };
5332
5333 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5334 {
5335         kref_get(&spec->kref);
5336
5337         return spec;
5338 }
5339
5340 static void rbd_spec_free(struct kref *kref);
5341 static void rbd_spec_put(struct rbd_spec *spec)
5342 {
5343         if (spec)
5344                 kref_put(&spec->kref, rbd_spec_free);
5345 }
5346
5347 static struct rbd_spec *rbd_spec_alloc(void)
5348 {
5349         struct rbd_spec *spec;
5350
5351         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5352         if (!spec)
5353                 return NULL;
5354
5355         spec->pool_id = CEPH_NOPOOL;
5356         spec->snap_id = CEPH_NOSNAP;
5357         kref_init(&spec->kref);
5358
5359         return spec;
5360 }
5361
5362 static void rbd_spec_free(struct kref *kref)
5363 {
5364         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5365
5366         kfree(spec->pool_name);
5367         kfree(spec->pool_ns);
5368         kfree(spec->image_id);
5369         kfree(spec->image_name);
5370         kfree(spec->snap_name);
5371         kfree(spec);
5372 }
5373
5374 static void rbd_dev_free(struct rbd_device *rbd_dev)
5375 {
5376         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5377         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5378
5379         ceph_oid_destroy(&rbd_dev->header_oid);
5380         ceph_oloc_destroy(&rbd_dev->header_oloc);
5381         kfree(rbd_dev->config_info);
5382
5383         rbd_put_client(rbd_dev->rbd_client);
5384         rbd_spec_put(rbd_dev->spec);
5385         kfree(rbd_dev->opts);
5386         kfree(rbd_dev);
5387 }
5388
5389 static void rbd_dev_release(struct device *dev)
5390 {
5391         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5392         bool need_put = !!rbd_dev->opts;
5393
5394         if (need_put) {
5395                 destroy_workqueue(rbd_dev->task_wq);
5396                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5397         }
5398
5399         rbd_dev_free(rbd_dev);
5400
5401         /*
5402          * This is racy, but way better than putting module outside of
5403          * the release callback.  The race window is pretty small, so
5404          * doing something similar to dm (dm-builtin.c) is overkill.
5405          */
5406         if (need_put)
5407                 module_put(THIS_MODULE);
5408 }
5409
5410 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5411                                            struct rbd_spec *spec)
5412 {
5413         struct rbd_device *rbd_dev;
5414
5415         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5416         if (!rbd_dev)
5417                 return NULL;
5418
5419         spin_lock_init(&rbd_dev->lock);
5420         INIT_LIST_HEAD(&rbd_dev->node);
5421         init_rwsem(&rbd_dev->header_rwsem);
5422
5423         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5424         ceph_oid_init(&rbd_dev->header_oid);
5425         rbd_dev->header_oloc.pool = spec->pool_id;
5426         if (spec->pool_ns) {
5427                 WARN_ON(!*spec->pool_ns);
5428                 rbd_dev->header_oloc.pool_ns =
5429                     ceph_find_or_create_string(spec->pool_ns,
5430                                                strlen(spec->pool_ns));
5431         }
5432
5433         mutex_init(&rbd_dev->watch_mutex);
5434         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5435         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5436
5437         init_rwsem(&rbd_dev->lock_rwsem);
5438         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5439         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5440         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5441         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5442         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5443         spin_lock_init(&rbd_dev->lock_lists_lock);
5444         INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5445         INIT_LIST_HEAD(&rbd_dev->running_list);
5446         init_completion(&rbd_dev->acquire_wait);
5447         init_completion(&rbd_dev->releasing_wait);
5448
5449         spin_lock_init(&rbd_dev->object_map_lock);
5450
5451         rbd_dev->dev.bus = &rbd_bus_type;
5452         rbd_dev->dev.type = &rbd_device_type;
5453         rbd_dev->dev.parent = &rbd_root_dev;
5454         device_initialize(&rbd_dev->dev);
5455
5456         rbd_dev->rbd_client = rbdc;
5457         rbd_dev->spec = spec;
5458
5459         return rbd_dev;
5460 }
5461
5462 /*
5463  * Create a mapping rbd_dev.
5464  */
5465 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5466                                          struct rbd_spec *spec,
5467                                          struct rbd_options *opts)
5468 {
5469         struct rbd_device *rbd_dev;
5470
5471         rbd_dev = __rbd_dev_create(rbdc, spec);
5472         if (!rbd_dev)
5473                 return NULL;
5474
5475         rbd_dev->opts = opts;
5476
5477         /* get an id and fill in device name */
5478         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5479                                          minor_to_rbd_dev_id(1 << MINORBITS),
5480                                          GFP_KERNEL);
5481         if (rbd_dev->dev_id < 0)
5482                 goto fail_rbd_dev;
5483
5484         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5485         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5486                                                    rbd_dev->name);
5487         if (!rbd_dev->task_wq)
5488                 goto fail_dev_id;
5489
5490         /* we have a ref from do_rbd_add() */
5491         __module_get(THIS_MODULE);
5492
5493         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5494         return rbd_dev;
5495
5496 fail_dev_id:
5497         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5498 fail_rbd_dev:
5499         rbd_dev_free(rbd_dev);
5500         return NULL;
5501 }
5502
5503 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5504 {
5505         if (rbd_dev)
5506                 put_device(&rbd_dev->dev);
5507 }
5508
5509 /*
5510  * Get the size and object order for an image snapshot, or if
5511  * snap_id is CEPH_NOSNAP, gets this information for the base
5512  * image.
5513  */
5514 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5515                                 u8 *order, u64 *snap_size)
5516 {
5517         __le64 snapid = cpu_to_le64(snap_id);
5518         int ret;
5519         struct {
5520                 u8 order;
5521                 __le64 size;
5522         } __attribute__ ((packed)) size_buf = { 0 };
5523
5524         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5525                                   &rbd_dev->header_oloc, "get_size",
5526                                   &snapid, sizeof(snapid),
5527                                   &size_buf, sizeof(size_buf));
5528         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5529         if (ret < 0)
5530                 return ret;
5531         if (ret < sizeof (size_buf))
5532                 return -ERANGE;
5533
5534         if (order) {
5535                 *order = size_buf.order;
5536                 dout("  order %u", (unsigned int)*order);
5537         }
5538         *snap_size = le64_to_cpu(size_buf.size);
5539
5540         dout("  snap_id 0x%016llx snap_size = %llu\n",
5541                 (unsigned long long)snap_id,
5542                 (unsigned long long)*snap_size);
5543
5544         return 0;
5545 }
5546
5547 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5548 {
5549         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5550                                         &rbd_dev->header.obj_order,
5551                                         &rbd_dev->header.image_size);
5552 }
5553
5554 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5555 {
5556         size_t size;
5557         void *reply_buf;
5558         int ret;
5559         void *p;
5560
5561         /* Response will be an encoded string, which includes a length */
5562         size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5563         reply_buf = kzalloc(size, GFP_KERNEL);
5564         if (!reply_buf)
5565                 return -ENOMEM;
5566
5567         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5568                                   &rbd_dev->header_oloc, "get_object_prefix",
5569                                   NULL, 0, reply_buf, size);
5570         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5571         if (ret < 0)
5572                 goto out;
5573
5574         p = reply_buf;
5575         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5576                                                 p + ret, NULL, GFP_NOIO);
5577         ret = 0;
5578
5579         if (IS_ERR(rbd_dev->header.object_prefix)) {
5580                 ret = PTR_ERR(rbd_dev->header.object_prefix);
5581                 rbd_dev->header.object_prefix = NULL;
5582         } else {
5583                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5584         }
5585 out:
5586         kfree(reply_buf);
5587
5588         return ret;
5589 }
5590
5591 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5592                                      bool read_only, u64 *snap_features)
5593 {
5594         struct {
5595                 __le64 snap_id;
5596                 u8 read_only;
5597         } features_in;
5598         struct {
5599                 __le64 features;
5600                 __le64 incompat;
5601         } __attribute__ ((packed)) features_buf = { 0 };
5602         u64 unsup;
5603         int ret;
5604
5605         features_in.snap_id = cpu_to_le64(snap_id);
5606         features_in.read_only = read_only;
5607
5608         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5609                                   &rbd_dev->header_oloc, "get_features",
5610                                   &features_in, sizeof(features_in),
5611                                   &features_buf, sizeof(features_buf));
5612         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5613         if (ret < 0)
5614                 return ret;
5615         if (ret < sizeof (features_buf))
5616                 return -ERANGE;
5617
5618         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5619         if (unsup) {
5620                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5621                          unsup);
5622                 return -ENXIO;
5623         }
5624
5625         *snap_features = le64_to_cpu(features_buf.features);
5626
5627         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5628                 (unsigned long long)snap_id,
5629                 (unsigned long long)*snap_features,
5630                 (unsigned long long)le64_to_cpu(features_buf.incompat));
5631
5632         return 0;
5633 }
5634
5635 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5636 {
5637         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5638                                          rbd_is_ro(rbd_dev),
5639                                          &rbd_dev->header.features);
5640 }
5641
5642 /*
5643  * These are generic image flags, but since they are used only for
5644  * object map, store them in rbd_dev->object_map_flags.
5645  *
5646  * For the same reason, this function is called only on object map
5647  * (re)load and not on header refresh.
5648  */
5649 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5650 {
5651         __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5652         __le64 flags;
5653         int ret;
5654
5655         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5656                                   &rbd_dev->header_oloc, "get_flags",
5657                                   &snapid, sizeof(snapid),
5658                                   &flags, sizeof(flags));
5659         if (ret < 0)
5660                 return ret;
5661         if (ret < sizeof(flags))
5662                 return -EBADMSG;
5663
5664         rbd_dev->object_map_flags = le64_to_cpu(flags);
5665         return 0;
5666 }
5667
5668 struct parent_image_info {
5669         u64             pool_id;
5670         const char      *pool_ns;
5671         const char      *image_id;
5672         u64             snap_id;
5673
5674         bool            has_overlap;
5675         u64             overlap;
5676 };
5677
5678 /*
5679  * The caller is responsible for @pii.
5680  */
5681 static int decode_parent_image_spec(void **p, void *end,
5682                                     struct parent_image_info *pii)
5683 {
5684         u8 struct_v;
5685         u32 struct_len;
5686         int ret;
5687
5688         ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5689                                   &struct_v, &struct_len);
5690         if (ret)
5691                 return ret;
5692
5693         ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5694         pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5695         if (IS_ERR(pii->pool_ns)) {
5696                 ret = PTR_ERR(pii->pool_ns);
5697                 pii->pool_ns = NULL;
5698                 return ret;
5699         }
5700         pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5701         if (IS_ERR(pii->image_id)) {
5702                 ret = PTR_ERR(pii->image_id);
5703                 pii->image_id = NULL;
5704                 return ret;
5705         }
5706         ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5707         return 0;
5708
5709 e_inval:
5710         return -EINVAL;
5711 }
5712
5713 static int __get_parent_info(struct rbd_device *rbd_dev,
5714                              struct page *req_page,
5715                              struct page *reply_page,
5716                              struct parent_image_info *pii)
5717 {
5718         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5719         size_t reply_len = PAGE_SIZE;
5720         void *p, *end;
5721         int ret;
5722
5723         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5724                              "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5725                              req_page, sizeof(u64), &reply_page, &reply_len);
5726         if (ret)
5727                 return ret == -EOPNOTSUPP ? 1 : ret;
5728
5729         p = page_address(reply_page);
5730         end = p + reply_len;
5731         ret = decode_parent_image_spec(&p, end, pii);
5732         if (ret)
5733                 return ret;
5734
5735         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5736                              "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5737                              req_page, sizeof(u64), &reply_page, &reply_len);
5738         if (ret)
5739                 return ret;
5740
5741         p = page_address(reply_page);
5742         end = p + reply_len;
5743         ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5744         if (pii->has_overlap)
5745                 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5746
5747         return 0;
5748
5749 e_inval:
5750         return -EINVAL;
5751 }
5752
5753 /*
5754  * The caller is responsible for @pii.
5755  */
5756 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5757                                     struct page *req_page,
5758                                     struct page *reply_page,
5759                                     struct parent_image_info *pii)
5760 {
5761         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5762         size_t reply_len = PAGE_SIZE;
5763         void *p, *end;
5764         int ret;
5765
5766         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5767                              "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5768                              req_page, sizeof(u64), &reply_page, &reply_len);
5769         if (ret)
5770                 return ret;
5771
5772         p = page_address(reply_page);
5773         end = p + reply_len;
5774         ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5775         pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5776         if (IS_ERR(pii->image_id)) {
5777                 ret = PTR_ERR(pii->image_id);
5778                 pii->image_id = NULL;
5779                 return ret;
5780         }
5781         ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5782         pii->has_overlap = true;
5783         ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5784
5785         return 0;
5786
5787 e_inval:
5788         return -EINVAL;
5789 }
5790
5791 static int get_parent_info(struct rbd_device *rbd_dev,
5792                            struct parent_image_info *pii)
5793 {
5794         struct page *req_page, *reply_page;
5795         void *p;
5796         int ret;
5797
5798         req_page = alloc_page(GFP_KERNEL);
5799         if (!req_page)
5800                 return -ENOMEM;
5801
5802         reply_page = alloc_page(GFP_KERNEL);
5803         if (!reply_page) {
5804                 __free_page(req_page);
5805                 return -ENOMEM;
5806         }
5807
5808         p = page_address(req_page);
5809         ceph_encode_64(&p, rbd_dev->spec->snap_id);
5810         ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5811         if (ret > 0)
5812                 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5813                                                pii);
5814
5815         __free_page(req_page);
5816         __free_page(reply_page);
5817         return ret;
5818 }
5819
5820 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5821 {
5822         struct rbd_spec *parent_spec;
5823         struct parent_image_info pii = { 0 };
5824         int ret;
5825
5826         parent_spec = rbd_spec_alloc();
5827         if (!parent_spec)
5828                 return -ENOMEM;
5829
5830         ret = get_parent_info(rbd_dev, &pii);
5831         if (ret)
5832                 goto out_err;
5833
5834         dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5835              __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5836              pii.has_overlap, pii.overlap);
5837
5838         if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5839                 /*
5840                  * Either the parent never existed, or we have
5841                  * record of it but the image got flattened so it no
5842                  * longer has a parent.  When the parent of a
5843                  * layered image disappears we immediately set the
5844                  * overlap to 0.  The effect of this is that all new
5845                  * requests will be treated as if the image had no
5846                  * parent.
5847                  *
5848                  * If !pii.has_overlap, the parent image spec is not
5849                  * applicable.  It's there to avoid duplication in each
5850                  * snapshot record.
5851                  */
5852                 if (rbd_dev->parent_overlap) {
5853                         rbd_dev->parent_overlap = 0;
5854                         rbd_dev_parent_put(rbd_dev);
5855                         pr_info("%s: clone image has been flattened\n",
5856                                 rbd_dev->disk->disk_name);
5857                 }
5858
5859                 goto out;       /* No parent?  No problem. */
5860         }
5861
5862         /* The ceph file layout needs to fit pool id in 32 bits */
5863
5864         ret = -EIO;
5865         if (pii.pool_id > (u64)U32_MAX) {
5866                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5867                         (unsigned long long)pii.pool_id, U32_MAX);
5868                 goto out_err;
5869         }
5870
5871         /*
5872          * The parent won't change (except when the clone is
5873          * flattened, already handled that).  So we only need to
5874          * record the parent spec we have not already done so.
5875          */
5876         if (!rbd_dev->parent_spec) {
5877                 parent_spec->pool_id = pii.pool_id;
5878                 if (pii.pool_ns && *pii.pool_ns) {
5879                         parent_spec->pool_ns = pii.pool_ns;
5880                         pii.pool_ns = NULL;
5881                 }
5882                 parent_spec->image_id = pii.image_id;
5883                 pii.image_id = NULL;
5884                 parent_spec->snap_id = pii.snap_id;
5885
5886                 rbd_dev->parent_spec = parent_spec;
5887                 parent_spec = NULL;     /* rbd_dev now owns this */
5888         }
5889
5890         /*
5891          * We always update the parent overlap.  If it's zero we issue
5892          * a warning, as we will proceed as if there was no parent.
5893          */
5894         if (!pii.overlap) {
5895                 if (parent_spec) {
5896                         /* refresh, careful to warn just once */
5897                         if (rbd_dev->parent_overlap)
5898                                 rbd_warn(rbd_dev,
5899                                     "clone now standalone (overlap became 0)");
5900                 } else {
5901                         /* initial probe */
5902                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
5903                 }
5904         }
5905         rbd_dev->parent_overlap = pii.overlap;
5906
5907 out:
5908         ret = 0;
5909 out_err:
5910         kfree(pii.pool_ns);
5911         kfree(pii.image_id);
5912         rbd_spec_put(parent_spec);
5913         return ret;
5914 }
5915
5916 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5917 {
5918         struct {
5919                 __le64 stripe_unit;
5920                 __le64 stripe_count;
5921         } __attribute__ ((packed)) striping_info_buf = { 0 };
5922         size_t size = sizeof (striping_info_buf);
5923         void *p;
5924         int ret;
5925
5926         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5927                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
5928                                 NULL, 0, &striping_info_buf, size);
5929         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5930         if (ret < 0)
5931                 return ret;
5932         if (ret < size)
5933                 return -ERANGE;
5934
5935         p = &striping_info_buf;
5936         rbd_dev->header.stripe_unit = ceph_decode_64(&p);
5937         rbd_dev->header.stripe_count = ceph_decode_64(&p);
5938         return 0;
5939 }
5940
5941 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5942 {
5943         __le64 data_pool_id;
5944         int ret;
5945
5946         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5947                                   &rbd_dev->header_oloc, "get_data_pool",
5948                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
5949         if (ret < 0)
5950                 return ret;
5951         if (ret < sizeof(data_pool_id))
5952                 return -EBADMSG;
5953
5954         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5955         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5956         return 0;
5957 }
5958
5959 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5960 {
5961         CEPH_DEFINE_OID_ONSTACK(oid);
5962         size_t image_id_size;
5963         char *image_id;
5964         void *p;
5965         void *end;
5966         size_t size;
5967         void *reply_buf = NULL;
5968         size_t len = 0;
5969         char *image_name = NULL;
5970         int ret;
5971
5972         rbd_assert(!rbd_dev->spec->image_name);
5973
5974         len = strlen(rbd_dev->spec->image_id);
5975         image_id_size = sizeof (__le32) + len;
5976         image_id = kmalloc(image_id_size, GFP_KERNEL);
5977         if (!image_id)
5978                 return NULL;
5979
5980         p = image_id;
5981         end = image_id + image_id_size;
5982         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
5983
5984         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5985         reply_buf = kmalloc(size, GFP_KERNEL);
5986         if (!reply_buf)
5987                 goto out;
5988
5989         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5990         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5991                                   "dir_get_name", image_id, image_id_size,
5992                                   reply_buf, size);
5993         if (ret < 0)
5994                 goto out;
5995         p = reply_buf;
5996         end = reply_buf + ret;
5997
5998         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5999         if (IS_ERR(image_name))
6000                 image_name = NULL;
6001         else
6002                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
6003 out:
6004         kfree(reply_buf);
6005         kfree(image_id);
6006
6007         return image_name;
6008 }
6009
6010 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6011 {
6012         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6013         const char *snap_name;
6014         u32 which = 0;
6015
6016         /* Skip over names until we find the one we are looking for */
6017
6018         snap_name = rbd_dev->header.snap_names;
6019         while (which < snapc->num_snaps) {
6020                 if (!strcmp(name, snap_name))
6021                         return snapc->snaps[which];
6022                 snap_name += strlen(snap_name) + 1;
6023                 which++;
6024         }
6025         return CEPH_NOSNAP;
6026 }
6027
6028 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6029 {
6030         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6031         u32 which;
6032         bool found = false;
6033         u64 snap_id;
6034
6035         for (which = 0; !found && which < snapc->num_snaps; which++) {
6036                 const char *snap_name;
6037
6038                 snap_id = snapc->snaps[which];
6039                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6040                 if (IS_ERR(snap_name)) {
6041                         /* ignore no-longer existing snapshots */
6042                         if (PTR_ERR(snap_name) == -ENOENT)
6043                                 continue;
6044                         else
6045                                 break;
6046                 }
6047                 found = !strcmp(name, snap_name);
6048                 kfree(snap_name);
6049         }
6050         return found ? snap_id : CEPH_NOSNAP;
6051 }
6052
6053 /*
6054  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
6055  * no snapshot by that name is found, or if an error occurs.
6056  */
6057 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6058 {
6059         if (rbd_dev->image_format == 1)
6060                 return rbd_v1_snap_id_by_name(rbd_dev, name);
6061
6062         return rbd_v2_snap_id_by_name(rbd_dev, name);
6063 }
6064
6065 /*
6066  * An image being mapped will have everything but the snap id.
6067  */
6068 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6069 {
6070         struct rbd_spec *spec = rbd_dev->spec;
6071
6072         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6073         rbd_assert(spec->image_id && spec->image_name);
6074         rbd_assert(spec->snap_name);
6075
6076         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6077                 u64 snap_id;
6078
6079                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6080                 if (snap_id == CEPH_NOSNAP)
6081                         return -ENOENT;
6082
6083                 spec->snap_id = snap_id;
6084         } else {
6085                 spec->snap_id = CEPH_NOSNAP;
6086         }
6087
6088         return 0;
6089 }
6090
6091 /*
6092  * A parent image will have all ids but none of the names.
6093  *
6094  * All names in an rbd spec are dynamically allocated.  It's OK if we
6095  * can't figure out the name for an image id.
6096  */
6097 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6098 {
6099         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6100         struct rbd_spec *spec = rbd_dev->spec;
6101         const char *pool_name;
6102         const char *image_name;
6103         const char *snap_name;
6104         int ret;
6105
6106         rbd_assert(spec->pool_id != CEPH_NOPOOL);
6107         rbd_assert(spec->image_id);
6108         rbd_assert(spec->snap_id != CEPH_NOSNAP);
6109
6110         /* Get the pool name; we have to make our own copy of this */
6111
6112         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6113         if (!pool_name) {
6114                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6115                 return -EIO;
6116         }
6117         pool_name = kstrdup(pool_name, GFP_KERNEL);
6118         if (!pool_name)
6119                 return -ENOMEM;
6120
6121         /* Fetch the image name; tolerate failure here */
6122
6123         image_name = rbd_dev_image_name(rbd_dev);
6124         if (!image_name)
6125                 rbd_warn(rbd_dev, "unable to get image name");
6126
6127         /* Fetch the snapshot name */
6128
6129         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6130         if (IS_ERR(snap_name)) {
6131                 ret = PTR_ERR(snap_name);
6132                 goto out_err;
6133         }
6134
6135         spec->pool_name = pool_name;
6136         spec->image_name = image_name;
6137         spec->snap_name = snap_name;
6138
6139         return 0;
6140
6141 out_err:
6142         kfree(image_name);
6143         kfree(pool_name);
6144         return ret;
6145 }
6146
6147 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6148 {
6149         size_t size;
6150         int ret;
6151         void *reply_buf;
6152         void *p;
6153         void *end;
6154         u64 seq;
6155         u32 snap_count;
6156         struct ceph_snap_context *snapc;
6157         u32 i;
6158
6159         /*
6160          * We'll need room for the seq value (maximum snapshot id),
6161          * snapshot count, and array of that many snapshot ids.
6162          * For now we have a fixed upper limit on the number we're
6163          * prepared to receive.
6164          */
6165         size = sizeof (__le64) + sizeof (__le32) +
6166                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
6167         reply_buf = kzalloc(size, GFP_KERNEL);
6168         if (!reply_buf)
6169                 return -ENOMEM;
6170
6171         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6172                                   &rbd_dev->header_oloc, "get_snapcontext",
6173                                   NULL, 0, reply_buf, size);
6174         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6175         if (ret < 0)
6176                 goto out;
6177
6178         p = reply_buf;
6179         end = reply_buf + ret;
6180         ret = -ERANGE;
6181         ceph_decode_64_safe(&p, end, seq, out);
6182         ceph_decode_32_safe(&p, end, snap_count, out);
6183
6184         /*
6185          * Make sure the reported number of snapshot ids wouldn't go
6186          * beyond the end of our buffer.  But before checking that,
6187          * make sure the computed size of the snapshot context we
6188          * allocate is representable in a size_t.
6189          */
6190         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6191                                  / sizeof (u64)) {
6192                 ret = -EINVAL;
6193                 goto out;
6194         }
6195         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6196                 goto out;
6197         ret = 0;
6198
6199         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6200         if (!snapc) {
6201                 ret = -ENOMEM;
6202                 goto out;
6203         }
6204         snapc->seq = seq;
6205         for (i = 0; i < snap_count; i++)
6206                 snapc->snaps[i] = ceph_decode_64(&p);
6207
6208         ceph_put_snap_context(rbd_dev->header.snapc);
6209         rbd_dev->header.snapc = snapc;
6210
6211         dout("  snap context seq = %llu, snap_count = %u\n",
6212                 (unsigned long long)seq, (unsigned int)snap_count);
6213 out:
6214         kfree(reply_buf);
6215
6216         return ret;
6217 }
6218
6219 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6220                                         u64 snap_id)
6221 {
6222         size_t size;
6223         void *reply_buf;
6224         __le64 snapid;
6225         int ret;
6226         void *p;
6227         void *end;
6228         char *snap_name;
6229
6230         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6231         reply_buf = kmalloc(size, GFP_KERNEL);
6232         if (!reply_buf)
6233                 return ERR_PTR(-ENOMEM);
6234
6235         snapid = cpu_to_le64(snap_id);
6236         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6237                                   &rbd_dev->header_oloc, "get_snapshot_name",
6238                                   &snapid, sizeof(snapid), reply_buf, size);
6239         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6240         if (ret < 0) {
6241                 snap_name = ERR_PTR(ret);
6242                 goto out;
6243         }
6244
6245         p = reply_buf;
6246         end = reply_buf + ret;
6247         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6248         if (IS_ERR(snap_name))
6249                 goto out;
6250
6251         dout("  snap_id 0x%016llx snap_name = %s\n",
6252                 (unsigned long long)snap_id, snap_name);
6253 out:
6254         kfree(reply_buf);
6255
6256         return snap_name;
6257 }
6258
6259 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6260 {
6261         bool first_time = rbd_dev->header.object_prefix == NULL;
6262         int ret;
6263
6264         ret = rbd_dev_v2_image_size(rbd_dev);
6265         if (ret)
6266                 return ret;
6267
6268         if (first_time) {
6269                 ret = rbd_dev_v2_header_onetime(rbd_dev);
6270                 if (ret)
6271                         return ret;
6272         }
6273
6274         ret = rbd_dev_v2_snap_context(rbd_dev);
6275         if (ret && first_time) {
6276                 kfree(rbd_dev->header.object_prefix);
6277                 rbd_dev->header.object_prefix = NULL;
6278         }
6279
6280         return ret;
6281 }
6282
6283 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6284 {
6285         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6286
6287         if (rbd_dev->image_format == 1)
6288                 return rbd_dev_v1_header_info(rbd_dev);
6289
6290         return rbd_dev_v2_header_info(rbd_dev);
6291 }
6292
6293 /*
6294  * Skips over white space at *buf, and updates *buf to point to the
6295  * first found non-space character (if any). Returns the length of
6296  * the token (string of non-white space characters) found.  Note
6297  * that *buf must be terminated with '\0'.
6298  */
6299 static inline size_t next_token(const char **buf)
6300 {
6301         /*
6302         * These are the characters that produce nonzero for
6303         * isspace() in the "C" and "POSIX" locales.
6304         */
6305         const char *spaces = " \f\n\r\t\v";
6306
6307         *buf += strspn(*buf, spaces);   /* Find start of token */
6308
6309         return strcspn(*buf, spaces);   /* Return token length */
6310 }
6311
6312 /*
6313  * Finds the next token in *buf, dynamically allocates a buffer big
6314  * enough to hold a copy of it, and copies the token into the new
6315  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6316  * that a duplicate buffer is created even for a zero-length token.
6317  *
6318  * Returns a pointer to the newly-allocated duplicate, or a null
6319  * pointer if memory for the duplicate was not available.  If
6320  * the lenp argument is a non-null pointer, the length of the token
6321  * (not including the '\0') is returned in *lenp.
6322  *
6323  * If successful, the *buf pointer will be updated to point beyond
6324  * the end of the found token.
6325  *
6326  * Note: uses GFP_KERNEL for allocation.
6327  */
6328 static inline char *dup_token(const char **buf, size_t *lenp)
6329 {
6330         char *dup;
6331         size_t len;
6332
6333         len = next_token(buf);
6334         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6335         if (!dup)
6336                 return NULL;
6337         *(dup + len) = '\0';
6338         *buf += len;
6339
6340         if (lenp)
6341                 *lenp = len;
6342
6343         return dup;
6344 }
6345
6346 static int rbd_parse_param(struct fs_parameter *param,
6347                             struct rbd_parse_opts_ctx *pctx)
6348 {
6349         struct rbd_options *opt = pctx->opts;
6350         struct fs_parse_result result;
6351         struct p_log log = {.prefix = "rbd"};
6352         int token, ret;
6353
6354         ret = ceph_parse_param(param, pctx->copts, NULL);
6355         if (ret != -ENOPARAM)
6356                 return ret;
6357
6358         token = __fs_parse(&log, rbd_parameters, param, &result);
6359         dout("%s fs_parse '%s' token %d\n", __func__, param->key, token);
6360         if (token < 0) {
6361                 if (token == -ENOPARAM)
6362                         return inval_plog(&log, "Unknown parameter '%s'",
6363                                           param->key);
6364                 return token;
6365         }
6366
6367         switch (token) {
6368         case Opt_queue_depth:
6369                 if (result.uint_32 < 1)
6370                         goto out_of_range;
6371                 opt->queue_depth = result.uint_32;
6372                 break;
6373         case Opt_alloc_size:
6374                 if (result.uint_32 < SECTOR_SIZE)
6375                         goto out_of_range;
6376                 if (!is_power_of_2(result.uint_32))
6377                         return inval_plog(&log, "alloc_size must be a power of 2");
6378                 opt->alloc_size = result.uint_32;
6379                 break;
6380         case Opt_lock_timeout:
6381                 /* 0 is "wait forever" (i.e. infinite timeout) */
6382                 if (result.uint_32 > INT_MAX / 1000)
6383                         goto out_of_range;
6384                 opt->lock_timeout = msecs_to_jiffies(result.uint_32 * 1000);
6385                 break;
6386         case Opt_pool_ns:
6387                 kfree(pctx->spec->pool_ns);
6388                 pctx->spec->pool_ns = param->string;
6389                 param->string = NULL;
6390                 break;
6391         case Opt_read_only:
6392                 opt->read_only = true;
6393                 break;
6394         case Opt_read_write:
6395                 opt->read_only = false;
6396                 break;
6397         case Opt_lock_on_read:
6398                 opt->lock_on_read = true;
6399                 break;
6400         case Opt_exclusive:
6401                 opt->exclusive = true;
6402                 break;
6403         case Opt_notrim:
6404                 opt->trim = false;
6405                 break;
6406         default:
6407                 BUG();
6408         }
6409
6410         return 0;
6411
6412 out_of_range:
6413         return inval_plog(&log, "%s out of range", param->key);
6414 }
6415
6416 /*
6417  * This duplicates most of generic_parse_monolithic(), untying it from
6418  * fs_context and skipping standard superblock and security options.
6419  */
6420 static int rbd_parse_options(char *options, struct rbd_parse_opts_ctx *pctx)
6421 {
6422         char *key;
6423         int ret = 0;
6424
6425         dout("%s '%s'\n", __func__, options);
6426         while ((key = strsep(&options, ",")) != NULL) {
6427                 if (*key) {
6428                         struct fs_parameter param = {
6429                                 .key    = key,
6430                                 .type   = fs_value_is_flag,
6431                         };
6432                         char *value = strchr(key, '=');
6433                         size_t v_len = 0;
6434
6435                         if (value) {
6436                                 if (value == key)
6437                                         continue;
6438                                 *value++ = 0;
6439                                 v_len = strlen(value);
6440                                 param.string = kmemdup_nul(value, v_len,
6441                                                            GFP_KERNEL);
6442                                 if (!param.string)
6443                                         return -ENOMEM;
6444                                 param.type = fs_value_is_string;
6445                         }
6446                         param.size = v_len;
6447
6448                         ret = rbd_parse_param(&param, pctx);
6449                         kfree(param.string);
6450                         if (ret)
6451                                 break;
6452                 }
6453         }
6454
6455         return ret;
6456 }
6457
6458 /*
6459  * Parse the options provided for an "rbd add" (i.e., rbd image
6460  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6461  * and the data written is passed here via a NUL-terminated buffer.
6462  * Returns 0 if successful or an error code otherwise.
6463  *
6464  * The information extracted from these options is recorded in
6465  * the other parameters which return dynamically-allocated
6466  * structures:
6467  *  ceph_opts
6468  *      The address of a pointer that will refer to a ceph options
6469  *      structure.  Caller must release the returned pointer using
6470  *      ceph_destroy_options() when it is no longer needed.
6471  *  rbd_opts
6472  *      Address of an rbd options pointer.  Fully initialized by
6473  *      this function; caller must release with kfree().
6474  *  spec
6475  *      Address of an rbd image specification pointer.  Fully
6476  *      initialized by this function based on parsed options.
6477  *      Caller must release with rbd_spec_put().
6478  *
6479  * The options passed take this form:
6480  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6481  * where:
6482  *  <mon_addrs>
6483  *      A comma-separated list of one or more monitor addresses.
6484  *      A monitor address is an ip address, optionally followed
6485  *      by a port number (separated by a colon).
6486  *        I.e.:  ip1[:port1][,ip2[:port2]...]
6487  *  <options>
6488  *      A comma-separated list of ceph and/or rbd options.
6489  *  <pool_name>
6490  *      The name of the rados pool containing the rbd image.
6491  *  <image_name>
6492  *      The name of the image in that pool to map.
6493  *  <snap_id>
6494  *      An optional snapshot id.  If provided, the mapping will
6495  *      present data from the image at the time that snapshot was
6496  *      created.  The image head is used if no snapshot id is
6497  *      provided.  Snapshot mappings are always read-only.
6498  */
6499 static int rbd_add_parse_args(const char *buf,
6500                                 struct ceph_options **ceph_opts,
6501                                 struct rbd_options **opts,
6502                                 struct rbd_spec **rbd_spec)
6503 {
6504         size_t len;
6505         char *options;
6506         const char *mon_addrs;
6507         char *snap_name;
6508         size_t mon_addrs_size;
6509         struct rbd_parse_opts_ctx pctx = { 0 };
6510         int ret;
6511
6512         /* The first four tokens are required */
6513
6514         len = next_token(&buf);
6515         if (!len) {
6516                 rbd_warn(NULL, "no monitor address(es) provided");
6517                 return -EINVAL;
6518         }
6519         mon_addrs = buf;
6520         mon_addrs_size = len;
6521         buf += len;
6522
6523         ret = -EINVAL;
6524         options = dup_token(&buf, NULL);
6525         if (!options)
6526                 return -ENOMEM;
6527         if (!*options) {
6528                 rbd_warn(NULL, "no options provided");
6529                 goto out_err;
6530         }
6531
6532         pctx.spec = rbd_spec_alloc();
6533         if (!pctx.spec)
6534                 goto out_mem;
6535
6536         pctx.spec->pool_name = dup_token(&buf, NULL);
6537         if (!pctx.spec->pool_name)
6538                 goto out_mem;
6539         if (!*pctx.spec->pool_name) {
6540                 rbd_warn(NULL, "no pool name provided");
6541                 goto out_err;
6542         }
6543
6544         pctx.spec->image_name = dup_token(&buf, NULL);
6545         if (!pctx.spec->image_name)
6546                 goto out_mem;
6547         if (!*pctx.spec->image_name) {
6548                 rbd_warn(NULL, "no image name provided");
6549                 goto out_err;
6550         }
6551
6552         /*
6553          * Snapshot name is optional; default is to use "-"
6554          * (indicating the head/no snapshot).
6555          */
6556         len = next_token(&buf);
6557         if (!len) {
6558                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6559                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6560         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6561                 ret = -ENAMETOOLONG;
6562                 goto out_err;
6563         }
6564         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6565         if (!snap_name)
6566                 goto out_mem;
6567         *(snap_name + len) = '\0';
6568         pctx.spec->snap_name = snap_name;
6569
6570         pctx.copts = ceph_alloc_options();
6571         if (!pctx.copts)
6572                 goto out_mem;
6573
6574         /* Initialize all rbd options to the defaults */
6575
6576         pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6577         if (!pctx.opts)
6578                 goto out_mem;
6579
6580         pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6581         pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6582         pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6583         pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6584         pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6585         pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6586         pctx.opts->trim = RBD_TRIM_DEFAULT;
6587
6588         ret = ceph_parse_mon_ips(mon_addrs, mon_addrs_size, pctx.copts, NULL);
6589         if (ret)
6590                 goto out_err;
6591
6592         ret = rbd_parse_options(options, &pctx);
6593         if (ret)
6594                 goto out_err;
6595
6596         *ceph_opts = pctx.copts;
6597         *opts = pctx.opts;
6598         *rbd_spec = pctx.spec;
6599         kfree(options);
6600         return 0;
6601
6602 out_mem:
6603         ret = -ENOMEM;
6604 out_err:
6605         kfree(pctx.opts);
6606         ceph_destroy_options(pctx.copts);
6607         rbd_spec_put(pctx.spec);
6608         kfree(options);
6609         return ret;
6610 }
6611
6612 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6613 {
6614         down_write(&rbd_dev->lock_rwsem);
6615         if (__rbd_is_lock_owner(rbd_dev))
6616                 __rbd_release_lock(rbd_dev);
6617         up_write(&rbd_dev->lock_rwsem);
6618 }
6619
6620 /*
6621  * If the wait is interrupted, an error is returned even if the lock
6622  * was successfully acquired.  rbd_dev_image_unlock() will release it
6623  * if needed.
6624  */
6625 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6626 {
6627         long ret;
6628
6629         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6630                 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6631                         return 0;
6632
6633                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6634                 return -EINVAL;
6635         }
6636
6637         if (rbd_is_ro(rbd_dev))
6638                 return 0;
6639
6640         rbd_assert(!rbd_is_lock_owner(rbd_dev));
6641         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6642         ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6643                             ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6644         if (ret > 0) {
6645                 ret = rbd_dev->acquire_err;
6646         } else {
6647                 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6648                 if (!ret)
6649                         ret = -ETIMEDOUT;
6650         }
6651
6652         if (ret) {
6653                 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6654                 return ret;
6655         }
6656
6657         /*
6658          * The lock may have been released by now, unless automatic lock
6659          * transitions are disabled.
6660          */
6661         rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6662         return 0;
6663 }
6664
6665 /*
6666  * An rbd format 2 image has a unique identifier, distinct from the
6667  * name given to it by the user.  Internally, that identifier is
6668  * what's used to specify the names of objects related to the image.
6669  *
6670  * A special "rbd id" object is used to map an rbd image name to its
6671  * id.  If that object doesn't exist, then there is no v2 rbd image
6672  * with the supplied name.
6673  *
6674  * This function will record the given rbd_dev's image_id field if
6675  * it can be determined, and in that case will return 0.  If any
6676  * errors occur a negative errno will be returned and the rbd_dev's
6677  * image_id field will be unchanged (and should be NULL).
6678  */
6679 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6680 {
6681         int ret;
6682         size_t size;
6683         CEPH_DEFINE_OID_ONSTACK(oid);
6684         void *response;
6685         char *image_id;
6686
6687         /*
6688          * When probing a parent image, the image id is already
6689          * known (and the image name likely is not).  There's no
6690          * need to fetch the image id again in this case.  We
6691          * do still need to set the image format though.
6692          */
6693         if (rbd_dev->spec->image_id) {
6694                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6695
6696                 return 0;
6697         }
6698
6699         /*
6700          * First, see if the format 2 image id file exists, and if
6701          * so, get the image's persistent id from it.
6702          */
6703         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6704                                rbd_dev->spec->image_name);
6705         if (ret)
6706                 return ret;
6707
6708         dout("rbd id object name is %s\n", oid.name);
6709
6710         /* Response will be an encoded string, which includes a length */
6711         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6712         response = kzalloc(size, GFP_NOIO);
6713         if (!response) {
6714                 ret = -ENOMEM;
6715                 goto out;
6716         }
6717
6718         /* If it doesn't exist we'll assume it's a format 1 image */
6719
6720         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6721                                   "get_id", NULL, 0,
6722                                   response, size);
6723         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6724         if (ret == -ENOENT) {
6725                 image_id = kstrdup("", GFP_KERNEL);
6726                 ret = image_id ? 0 : -ENOMEM;
6727                 if (!ret)
6728                         rbd_dev->image_format = 1;
6729         } else if (ret >= 0) {
6730                 void *p = response;
6731
6732                 image_id = ceph_extract_encoded_string(&p, p + ret,
6733                                                 NULL, GFP_NOIO);
6734                 ret = PTR_ERR_OR_ZERO(image_id);
6735                 if (!ret)
6736                         rbd_dev->image_format = 2;
6737         }
6738
6739         if (!ret) {
6740                 rbd_dev->spec->image_id = image_id;
6741                 dout("image_id is %s\n", image_id);
6742         }
6743 out:
6744         kfree(response);
6745         ceph_oid_destroy(&oid);
6746         return ret;
6747 }
6748
6749 /*
6750  * Undo whatever state changes are made by v1 or v2 header info
6751  * call.
6752  */
6753 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6754 {
6755         struct rbd_image_header *header;
6756
6757         rbd_dev_parent_put(rbd_dev);
6758         rbd_object_map_free(rbd_dev);
6759         rbd_dev_mapping_clear(rbd_dev);
6760
6761         /* Free dynamic fields from the header, then zero it out */
6762
6763         header = &rbd_dev->header;
6764         ceph_put_snap_context(header->snapc);
6765         kfree(header->snap_sizes);
6766         kfree(header->snap_names);
6767         kfree(header->object_prefix);
6768         memset(header, 0, sizeof (*header));
6769 }
6770
6771 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6772 {
6773         int ret;
6774
6775         ret = rbd_dev_v2_object_prefix(rbd_dev);
6776         if (ret)
6777                 goto out_err;
6778
6779         /*
6780          * Get the and check features for the image.  Currently the
6781          * features are assumed to never change.
6782          */
6783         ret = rbd_dev_v2_features(rbd_dev);
6784         if (ret)
6785                 goto out_err;
6786
6787         /* If the image supports fancy striping, get its parameters */
6788
6789         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6790                 ret = rbd_dev_v2_striping_info(rbd_dev);
6791                 if (ret < 0)
6792                         goto out_err;
6793         }
6794
6795         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6796                 ret = rbd_dev_v2_data_pool(rbd_dev);
6797                 if (ret)
6798                         goto out_err;
6799         }
6800
6801         rbd_init_layout(rbd_dev);
6802         return 0;
6803
6804 out_err:
6805         rbd_dev->header.features = 0;
6806         kfree(rbd_dev->header.object_prefix);
6807         rbd_dev->header.object_prefix = NULL;
6808         return ret;
6809 }
6810
6811 /*
6812  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6813  * rbd_dev_image_probe() recursion depth, which means it's also the
6814  * length of the already discovered part of the parent chain.
6815  */
6816 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6817 {
6818         struct rbd_device *parent = NULL;
6819         int ret;
6820
6821         if (!rbd_dev->parent_spec)
6822                 return 0;
6823
6824         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6825                 pr_info("parent chain is too long (%d)\n", depth);
6826                 ret = -EINVAL;
6827                 goto out_err;
6828         }
6829
6830         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6831         if (!parent) {
6832                 ret = -ENOMEM;
6833                 goto out_err;
6834         }
6835
6836         /*
6837          * Images related by parent/child relationships always share
6838          * rbd_client and spec/parent_spec, so bump their refcounts.
6839          */
6840         __rbd_get_client(rbd_dev->rbd_client);
6841         rbd_spec_get(rbd_dev->parent_spec);
6842
6843         __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6844
6845         ret = rbd_dev_image_probe(parent, depth);
6846         if (ret < 0)
6847                 goto out_err;
6848
6849         rbd_dev->parent = parent;
6850         atomic_set(&rbd_dev->parent_ref, 1);
6851         return 0;
6852
6853 out_err:
6854         rbd_dev_unparent(rbd_dev);
6855         rbd_dev_destroy(parent);
6856         return ret;
6857 }
6858
6859 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6860 {
6861         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6862         rbd_free_disk(rbd_dev);
6863         if (!single_major)
6864                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6865 }
6866
6867 /*
6868  * rbd_dev->header_rwsem must be locked for write and will be unlocked
6869  * upon return.
6870  */
6871 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6872 {
6873         int ret;
6874
6875         /* Record our major and minor device numbers. */
6876
6877         if (!single_major) {
6878                 ret = register_blkdev(0, rbd_dev->name);
6879                 if (ret < 0)
6880                         goto err_out_unlock;
6881
6882                 rbd_dev->major = ret;
6883                 rbd_dev->minor = 0;
6884         } else {
6885                 rbd_dev->major = rbd_major;
6886                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6887         }
6888
6889         /* Set up the blkdev mapping. */
6890
6891         ret = rbd_init_disk(rbd_dev);
6892         if (ret)
6893                 goto err_out_blkdev;
6894
6895         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6896         set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6897
6898         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6899         if (ret)
6900                 goto err_out_disk;
6901
6902         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6903         up_write(&rbd_dev->header_rwsem);
6904         return 0;
6905
6906 err_out_disk:
6907         rbd_free_disk(rbd_dev);
6908 err_out_blkdev:
6909         if (!single_major)
6910                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6911 err_out_unlock:
6912         up_write(&rbd_dev->header_rwsem);
6913         return ret;
6914 }
6915
6916 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6917 {
6918         struct rbd_spec *spec = rbd_dev->spec;
6919         int ret;
6920
6921         /* Record the header object name for this rbd image. */
6922
6923         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6924         if (rbd_dev->image_format == 1)
6925                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6926                                        spec->image_name, RBD_SUFFIX);
6927         else
6928                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6929                                        RBD_HEADER_PREFIX, spec->image_id);
6930
6931         return ret;
6932 }
6933
6934 static void rbd_print_dne(struct rbd_device *rbd_dev, bool is_snap)
6935 {
6936         if (!is_snap) {
6937                 pr_info("image %s/%s%s%s does not exist\n",
6938                         rbd_dev->spec->pool_name,
6939                         rbd_dev->spec->pool_ns ?: "",
6940                         rbd_dev->spec->pool_ns ? "/" : "",
6941                         rbd_dev->spec->image_name);
6942         } else {
6943                 pr_info("snap %s/%s%s%s@%s does not exist\n",
6944                         rbd_dev->spec->pool_name,
6945                         rbd_dev->spec->pool_ns ?: "",
6946                         rbd_dev->spec->pool_ns ? "/" : "",
6947                         rbd_dev->spec->image_name,
6948                         rbd_dev->spec->snap_name);
6949         }
6950 }
6951
6952 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6953 {
6954         rbd_dev_unprobe(rbd_dev);
6955         if (rbd_dev->opts)
6956                 rbd_unregister_watch(rbd_dev);
6957         rbd_dev->image_format = 0;
6958         kfree(rbd_dev->spec->image_id);
6959         rbd_dev->spec->image_id = NULL;
6960 }
6961
6962 /*
6963  * Probe for the existence of the header object for the given rbd
6964  * device.  If this image is the one being mapped (i.e., not a
6965  * parent), initiate a watch on its header object before using that
6966  * object to get detailed information about the rbd image.
6967  */
6968 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6969 {
6970         bool need_watch = !rbd_is_ro(rbd_dev);
6971         int ret;
6972
6973         /*
6974          * Get the id from the image id object.  Unless there's an
6975          * error, rbd_dev->spec->image_id will be filled in with
6976          * a dynamically-allocated string, and rbd_dev->image_format
6977          * will be set to either 1 or 2.
6978          */
6979         ret = rbd_dev_image_id(rbd_dev);
6980         if (ret)
6981                 return ret;
6982
6983         ret = rbd_dev_header_name(rbd_dev);
6984         if (ret)
6985                 goto err_out_format;
6986
6987         if (need_watch) {
6988                 ret = rbd_register_watch(rbd_dev);
6989                 if (ret) {
6990                         if (ret == -ENOENT)
6991                                 rbd_print_dne(rbd_dev, false);
6992                         goto err_out_format;
6993                 }
6994         }
6995
6996         ret = rbd_dev_header_info(rbd_dev);
6997         if (ret) {
6998                 if (ret == -ENOENT && !need_watch)
6999                         rbd_print_dne(rbd_dev, false);
7000                 goto err_out_watch;
7001         }
7002
7003         /*
7004          * If this image is the one being mapped, we have pool name and
7005          * id, image name and id, and snap name - need to fill snap id.
7006          * Otherwise this is a parent image, identified by pool, image
7007          * and snap ids - need to fill in names for those ids.
7008          */
7009         if (!depth)
7010                 ret = rbd_spec_fill_snap_id(rbd_dev);
7011         else
7012                 ret = rbd_spec_fill_names(rbd_dev);
7013         if (ret) {
7014                 if (ret == -ENOENT)
7015                         rbd_print_dne(rbd_dev, true);
7016                 goto err_out_probe;
7017         }
7018
7019         ret = rbd_dev_mapping_set(rbd_dev);
7020         if (ret)
7021                 goto err_out_probe;
7022
7023         if (rbd_is_snap(rbd_dev) &&
7024             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
7025                 ret = rbd_object_map_load(rbd_dev);
7026                 if (ret)
7027                         goto err_out_probe;
7028         }
7029
7030         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
7031                 ret = rbd_dev_v2_parent_info(rbd_dev);
7032                 if (ret)
7033                         goto err_out_probe;
7034         }
7035
7036         ret = rbd_dev_probe_parent(rbd_dev, depth);
7037         if (ret)
7038                 goto err_out_probe;
7039
7040         dout("discovered format %u image, header name is %s\n",
7041                 rbd_dev->image_format, rbd_dev->header_oid.name);
7042         return 0;
7043
7044 err_out_probe:
7045         rbd_dev_unprobe(rbd_dev);
7046 err_out_watch:
7047         if (need_watch)
7048                 rbd_unregister_watch(rbd_dev);
7049 err_out_format:
7050         rbd_dev->image_format = 0;
7051         kfree(rbd_dev->spec->image_id);
7052         rbd_dev->spec->image_id = NULL;
7053         return ret;
7054 }
7055
7056 static ssize_t do_rbd_add(struct bus_type *bus,
7057                           const char *buf,
7058                           size_t count)
7059 {
7060         struct rbd_device *rbd_dev = NULL;
7061         struct ceph_options *ceph_opts = NULL;
7062         struct rbd_options *rbd_opts = NULL;
7063         struct rbd_spec *spec = NULL;
7064         struct rbd_client *rbdc;
7065         int rc;
7066
7067         if (!try_module_get(THIS_MODULE))
7068                 return -ENODEV;
7069
7070         /* parse add command */
7071         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7072         if (rc < 0)
7073                 goto out;
7074
7075         rbdc = rbd_get_client(ceph_opts);
7076         if (IS_ERR(rbdc)) {
7077                 rc = PTR_ERR(rbdc);
7078                 goto err_out_args;
7079         }
7080
7081         /* pick the pool */
7082         rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7083         if (rc < 0) {
7084                 if (rc == -ENOENT)
7085                         pr_info("pool %s does not exist\n", spec->pool_name);
7086                 goto err_out_client;
7087         }
7088         spec->pool_id = (u64)rc;
7089
7090         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7091         if (!rbd_dev) {
7092                 rc = -ENOMEM;
7093                 goto err_out_client;
7094         }
7095         rbdc = NULL;            /* rbd_dev now owns this */
7096         spec = NULL;            /* rbd_dev now owns this */
7097         rbd_opts = NULL;        /* rbd_dev now owns this */
7098
7099         /* if we are mapping a snapshot it will be a read-only mapping */
7100         if (rbd_dev->opts->read_only ||
7101             strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7102                 __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7103
7104         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7105         if (!rbd_dev->config_info) {
7106                 rc = -ENOMEM;
7107                 goto err_out_rbd_dev;
7108         }
7109
7110         down_write(&rbd_dev->header_rwsem);
7111         rc = rbd_dev_image_probe(rbd_dev, 0);
7112         if (rc < 0) {
7113                 up_write(&rbd_dev->header_rwsem);
7114                 goto err_out_rbd_dev;
7115         }
7116
7117         if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7118                 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7119                          rbd_dev->layout.object_size);
7120                 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7121         }
7122
7123         rc = rbd_dev_device_setup(rbd_dev);
7124         if (rc)
7125                 goto err_out_image_probe;
7126
7127         rc = rbd_add_acquire_lock(rbd_dev);
7128         if (rc)
7129                 goto err_out_image_lock;
7130
7131         /* Everything's ready.  Announce the disk to the world. */
7132
7133         rc = device_add(&rbd_dev->dev);
7134         if (rc)
7135                 goto err_out_image_lock;
7136
7137         device_add_disk(&rbd_dev->dev, rbd_dev->disk, NULL);
7138         /* see rbd_init_disk() */
7139         blk_put_queue(rbd_dev->disk->queue);
7140
7141         spin_lock(&rbd_dev_list_lock);
7142         list_add_tail(&rbd_dev->node, &rbd_dev_list);
7143         spin_unlock(&rbd_dev_list_lock);
7144
7145         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7146                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7147                 rbd_dev->header.features);
7148         rc = count;
7149 out:
7150         module_put(THIS_MODULE);
7151         return rc;
7152
7153 err_out_image_lock:
7154         rbd_dev_image_unlock(rbd_dev);
7155         rbd_dev_device_release(rbd_dev);
7156 err_out_image_probe:
7157         rbd_dev_image_release(rbd_dev);
7158 err_out_rbd_dev:
7159         rbd_dev_destroy(rbd_dev);
7160 err_out_client:
7161         rbd_put_client(rbdc);
7162 err_out_args:
7163         rbd_spec_put(spec);
7164         kfree(rbd_opts);
7165         goto out;
7166 }
7167
7168 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7169 {
7170         if (single_major)
7171                 return -EINVAL;
7172
7173         return do_rbd_add(bus, buf, count);
7174 }
7175
7176 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7177                                       size_t count)
7178 {
7179         return do_rbd_add(bus, buf, count);
7180 }
7181
7182 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7183 {
7184         while (rbd_dev->parent) {
7185                 struct rbd_device *first = rbd_dev;
7186                 struct rbd_device *second = first->parent;
7187                 struct rbd_device *third;
7188
7189                 /*
7190                  * Follow to the parent with no grandparent and
7191                  * remove it.
7192                  */
7193                 while (second && (third = second->parent)) {
7194                         first = second;
7195                         second = third;
7196                 }
7197                 rbd_assert(second);
7198                 rbd_dev_image_release(second);
7199                 rbd_dev_destroy(second);
7200                 first->parent = NULL;
7201                 first->parent_overlap = 0;
7202
7203                 rbd_assert(first->parent_spec);
7204                 rbd_spec_put(first->parent_spec);
7205                 first->parent_spec = NULL;
7206         }
7207 }
7208
7209 static ssize_t do_rbd_remove(struct bus_type *bus,
7210                              const char *buf,
7211                              size_t count)
7212 {
7213         struct rbd_device *rbd_dev = NULL;
7214         struct list_head *tmp;
7215         int dev_id;
7216         char opt_buf[6];
7217         bool force = false;
7218         int ret;
7219
7220         dev_id = -1;
7221         opt_buf[0] = '\0';
7222         sscanf(buf, "%d %5s", &dev_id, opt_buf);
7223         if (dev_id < 0) {
7224                 pr_err("dev_id out of range\n");
7225                 return -EINVAL;
7226         }
7227         if (opt_buf[0] != '\0') {
7228                 if (!strcmp(opt_buf, "force")) {
7229                         force = true;
7230                 } else {
7231                         pr_err("bad remove option at '%s'\n", opt_buf);
7232                         return -EINVAL;
7233                 }
7234         }
7235
7236         ret = -ENOENT;
7237         spin_lock(&rbd_dev_list_lock);
7238         list_for_each(tmp, &rbd_dev_list) {
7239                 rbd_dev = list_entry(tmp, struct rbd_device, node);
7240                 if (rbd_dev->dev_id == dev_id) {
7241                         ret = 0;
7242                         break;
7243                 }
7244         }
7245         if (!ret) {
7246                 spin_lock_irq(&rbd_dev->lock);
7247                 if (rbd_dev->open_count && !force)
7248                         ret = -EBUSY;
7249                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7250                                           &rbd_dev->flags))
7251                         ret = -EINPROGRESS;
7252                 spin_unlock_irq(&rbd_dev->lock);
7253         }
7254         spin_unlock(&rbd_dev_list_lock);
7255         if (ret)
7256                 return ret;
7257
7258         if (force) {
7259                 /*
7260                  * Prevent new IO from being queued and wait for existing
7261                  * IO to complete/fail.
7262                  */
7263                 blk_mq_freeze_queue(rbd_dev->disk->queue);
7264                 blk_set_queue_dying(rbd_dev->disk->queue);
7265         }
7266
7267         del_gendisk(rbd_dev->disk);
7268         spin_lock(&rbd_dev_list_lock);
7269         list_del_init(&rbd_dev->node);
7270         spin_unlock(&rbd_dev_list_lock);
7271         device_del(&rbd_dev->dev);
7272
7273         rbd_dev_image_unlock(rbd_dev);
7274         rbd_dev_device_release(rbd_dev);
7275         rbd_dev_image_release(rbd_dev);
7276         rbd_dev_destroy(rbd_dev);
7277         return count;
7278 }
7279
7280 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7281 {
7282         if (single_major)
7283                 return -EINVAL;
7284
7285         return do_rbd_remove(bus, buf, count);
7286 }
7287
7288 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7289                                          size_t count)
7290 {
7291         return do_rbd_remove(bus, buf, count);
7292 }
7293
7294 /*
7295  * create control files in sysfs
7296  * /sys/bus/rbd/...
7297  */
7298 static int __init rbd_sysfs_init(void)
7299 {
7300         int ret;
7301
7302         ret = device_register(&rbd_root_dev);
7303         if (ret < 0)
7304                 return ret;
7305
7306         ret = bus_register(&rbd_bus_type);
7307         if (ret < 0)
7308                 device_unregister(&rbd_root_dev);
7309
7310         return ret;
7311 }
7312
7313 static void __exit rbd_sysfs_cleanup(void)
7314 {
7315         bus_unregister(&rbd_bus_type);
7316         device_unregister(&rbd_root_dev);
7317 }
7318
7319 static int __init rbd_slab_init(void)
7320 {
7321         rbd_assert(!rbd_img_request_cache);
7322         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7323         if (!rbd_img_request_cache)
7324                 return -ENOMEM;
7325
7326         rbd_assert(!rbd_obj_request_cache);
7327         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7328         if (!rbd_obj_request_cache)
7329                 goto out_err;
7330
7331         return 0;
7332
7333 out_err:
7334         kmem_cache_destroy(rbd_img_request_cache);
7335         rbd_img_request_cache = NULL;
7336         return -ENOMEM;
7337 }
7338
7339 static void rbd_slab_exit(void)
7340 {
7341         rbd_assert(rbd_obj_request_cache);
7342         kmem_cache_destroy(rbd_obj_request_cache);
7343         rbd_obj_request_cache = NULL;
7344
7345         rbd_assert(rbd_img_request_cache);
7346         kmem_cache_destroy(rbd_img_request_cache);
7347         rbd_img_request_cache = NULL;
7348 }
7349
7350 static int __init rbd_init(void)
7351 {
7352         int rc;
7353
7354         if (!libceph_compatible(NULL)) {
7355                 rbd_warn(NULL, "libceph incompatibility (quitting)");
7356                 return -EINVAL;
7357         }
7358
7359         rc = rbd_slab_init();
7360         if (rc)
7361                 return rc;
7362
7363         /*
7364          * The number of active work items is limited by the number of
7365          * rbd devices * queue depth, so leave @max_active at default.
7366          */
7367         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7368         if (!rbd_wq) {
7369                 rc = -ENOMEM;
7370                 goto err_out_slab;
7371         }
7372
7373         if (single_major) {
7374                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
7375                 if (rbd_major < 0) {
7376                         rc = rbd_major;
7377                         goto err_out_wq;
7378                 }
7379         }
7380
7381         rc = rbd_sysfs_init();
7382         if (rc)
7383                 goto err_out_blkdev;
7384
7385         if (single_major)
7386                 pr_info("loaded (major %d)\n", rbd_major);
7387         else
7388                 pr_info("loaded\n");
7389
7390         return 0;
7391
7392 err_out_blkdev:
7393         if (single_major)
7394                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7395 err_out_wq:
7396         destroy_workqueue(rbd_wq);
7397 err_out_slab:
7398         rbd_slab_exit();
7399         return rc;
7400 }
7401
7402 static void __exit rbd_exit(void)
7403 {
7404         ida_destroy(&rbd_dev_id_ida);
7405         rbd_sysfs_cleanup();
7406         if (single_major)
7407                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7408         destroy_workqueue(rbd_wq);
7409         rbd_slab_exit();
7410 }
7411
7412 module_init(rbd_init);
7413 module_exit(rbd_exit);
7414
7415 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7416 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7417 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7418 /* following authorship retained from original osdblk.c */
7419 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7420
7421 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7422 MODULE_LICENSE("GPL");