rbd: treat images mapped read-only seriously
[linux-2.6-microblaze.git] / drivers / block / rbd.c
1
2 /*
3    rbd.c -- Export ceph rados objects as a Linux block device
4
5
6    based on drivers/block/osdblk.c:
7
8    Copyright 2009 Red Hat, Inc.
9
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program; see the file COPYING.  If not, write to
21    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25    For usage instructions, please refer to:
26
27                  Documentation/ABI/testing/sysfs-bus-rbd
28
29  */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/cls_lock_client.h>
35 #include <linux/ceph/striper.h>
36 #include <linux/ceph/decode.h>
37 #include <linux/parser.h>
38 #include <linux/bsearch.h>
39
40 #include <linux/kernel.h>
41 #include <linux/device.h>
42 #include <linux/module.h>
43 #include <linux/blk-mq.h>
44 #include <linux/fs.h>
45 #include <linux/blkdev.h>
46 #include <linux/slab.h>
47 #include <linux/idr.h>
48 #include <linux/workqueue.h>
49
50 #include "rbd_types.h"
51
52 #define RBD_DEBUG       /* Activate rbd_assert() calls */
53
54 /*
55  * Increment the given counter and return its updated value.
56  * If the counter is already 0 it will not be incremented.
57  * If the counter is already at its maximum value returns
58  * -EINVAL without updating it.
59  */
60 static int atomic_inc_return_safe(atomic_t *v)
61 {
62         unsigned int counter;
63
64         counter = (unsigned int)atomic_fetch_add_unless(v, 1, 0);
65         if (counter <= (unsigned int)INT_MAX)
66                 return (int)counter;
67
68         atomic_dec(v);
69
70         return -EINVAL;
71 }
72
73 /* Decrement the counter.  Return the resulting value, or -EINVAL */
74 static int atomic_dec_return_safe(atomic_t *v)
75 {
76         int counter;
77
78         counter = atomic_dec_return(v);
79         if (counter >= 0)
80                 return counter;
81
82         atomic_inc(v);
83
84         return -EINVAL;
85 }
86
87 #define RBD_DRV_NAME "rbd"
88
89 #define RBD_MINORS_PER_MAJOR            256
90 #define RBD_SINGLE_MAJOR_PART_SHIFT     4
91
92 #define RBD_MAX_PARENT_CHAIN_LEN        16
93
94 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
95 #define RBD_MAX_SNAP_NAME_LEN   \
96                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
97
98 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
99
100 #define RBD_SNAP_HEAD_NAME      "-"
101
102 #define BAD_SNAP_INDEX  U32_MAX         /* invalid index into snap array */
103
104 /* This allows a single page to hold an image name sent by OSD */
105 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
106 #define RBD_IMAGE_ID_LEN_MAX    64
107
108 #define RBD_OBJ_PREFIX_LEN_MAX  64
109
110 #define RBD_NOTIFY_TIMEOUT      5       /* seconds */
111 #define RBD_RETRY_DELAY         msecs_to_jiffies(1000)
112
113 /* Feature bits */
114
115 #define RBD_FEATURE_LAYERING            (1ULL<<0)
116 #define RBD_FEATURE_STRIPINGV2          (1ULL<<1)
117 #define RBD_FEATURE_EXCLUSIVE_LOCK      (1ULL<<2)
118 #define RBD_FEATURE_OBJECT_MAP          (1ULL<<3)
119 #define RBD_FEATURE_FAST_DIFF           (1ULL<<4)
120 #define RBD_FEATURE_DEEP_FLATTEN        (1ULL<<5)
121 #define RBD_FEATURE_DATA_POOL           (1ULL<<7)
122 #define RBD_FEATURE_OPERATIONS          (1ULL<<8)
123
124 #define RBD_FEATURES_ALL        (RBD_FEATURE_LAYERING |         \
125                                  RBD_FEATURE_STRIPINGV2 |       \
126                                  RBD_FEATURE_EXCLUSIVE_LOCK |   \
127                                  RBD_FEATURE_OBJECT_MAP |       \
128                                  RBD_FEATURE_FAST_DIFF |        \
129                                  RBD_FEATURE_DEEP_FLATTEN |     \
130                                  RBD_FEATURE_DATA_POOL |        \
131                                  RBD_FEATURE_OPERATIONS)
132
133 /* Features supported by this (client software) implementation. */
134
135 #define RBD_FEATURES_SUPPORTED  (RBD_FEATURES_ALL)
136
137 /*
138  * An RBD device name will be "rbd#", where the "rbd" comes from
139  * RBD_DRV_NAME above, and # is a unique integer identifier.
140  */
141 #define DEV_NAME_LEN            32
142
143 /*
144  * block device image metadata (in-memory version)
145  */
146 struct rbd_image_header {
147         /* These six fields never change for a given rbd image */
148         char *object_prefix;
149         __u8 obj_order;
150         u64 stripe_unit;
151         u64 stripe_count;
152         s64 data_pool_id;
153         u64 features;           /* Might be changeable someday? */
154
155         /* The remaining fields need to be updated occasionally */
156         u64 image_size;
157         struct ceph_snap_context *snapc;
158         char *snap_names;       /* format 1 only */
159         u64 *snap_sizes;        /* format 1 only */
160 };
161
162 /*
163  * An rbd image specification.
164  *
165  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
166  * identify an image.  Each rbd_dev structure includes a pointer to
167  * an rbd_spec structure that encapsulates this identity.
168  *
169  * Each of the id's in an rbd_spec has an associated name.  For a
170  * user-mapped image, the names are supplied and the id's associated
171  * with them are looked up.  For a layered image, a parent image is
172  * defined by the tuple, and the names are looked up.
173  *
174  * An rbd_dev structure contains a parent_spec pointer which is
175  * non-null if the image it represents is a child in a layered
176  * image.  This pointer will refer to the rbd_spec structure used
177  * by the parent rbd_dev for its own identity (i.e., the structure
178  * is shared between the parent and child).
179  *
180  * Since these structures are populated once, during the discovery
181  * phase of image construction, they are effectively immutable so
182  * we make no effort to synchronize access to them.
183  *
184  * Note that code herein does not assume the image name is known (it
185  * could be a null pointer).
186  */
187 struct rbd_spec {
188         u64             pool_id;
189         const char      *pool_name;
190         const char      *pool_ns;       /* NULL if default, never "" */
191
192         const char      *image_id;
193         const char      *image_name;
194
195         u64             snap_id;
196         const char      *snap_name;
197
198         struct kref     kref;
199 };
200
201 /*
202  * an instance of the client.  multiple devices may share an rbd client.
203  */
204 struct rbd_client {
205         struct ceph_client      *client;
206         struct kref             kref;
207         struct list_head        node;
208 };
209
210 struct pending_result {
211         int                     result;         /* first nonzero result */
212         int                     num_pending;
213 };
214
215 struct rbd_img_request;
216
217 enum obj_request_type {
218         OBJ_REQUEST_NODATA = 1,
219         OBJ_REQUEST_BIO,        /* pointer into provided bio (list) */
220         OBJ_REQUEST_BVECS,      /* pointer into provided bio_vec array */
221         OBJ_REQUEST_OWN_BVECS,  /* private bio_vec array, doesn't own pages */
222 };
223
224 enum obj_operation_type {
225         OBJ_OP_READ = 1,
226         OBJ_OP_WRITE,
227         OBJ_OP_DISCARD,
228         OBJ_OP_ZEROOUT,
229 };
230
231 #define RBD_OBJ_FLAG_DELETION                   (1U << 0)
232 #define RBD_OBJ_FLAG_COPYUP_ENABLED             (1U << 1)
233 #define RBD_OBJ_FLAG_COPYUP_ZEROS               (1U << 2)
234 #define RBD_OBJ_FLAG_MAY_EXIST                  (1U << 3)
235 #define RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT       (1U << 4)
236
237 enum rbd_obj_read_state {
238         RBD_OBJ_READ_START = 1,
239         RBD_OBJ_READ_OBJECT,
240         RBD_OBJ_READ_PARENT,
241 };
242
243 /*
244  * Writes go through the following state machine to deal with
245  * layering:
246  *
247  *            . . . . . RBD_OBJ_WRITE_GUARD. . . . . . . . . . . . . .
248  *            .                 |                                    .
249  *            .                 v                                    .
250  *            .    RBD_OBJ_WRITE_READ_FROM_PARENT. . .               .
251  *            .                 |                    .               .
252  *            .                 v                    v (deep-copyup  .
253  *    (image  .   RBD_OBJ_WRITE_COPYUP_EMPTY_SNAPC   .  not needed)  .
254  * flattened) v                 |                    .               .
255  *            .                 v                    .               .
256  *            . . . .RBD_OBJ_WRITE_COPYUP_OPS. . . . .      (copyup  .
257  *                              |                        not needed) v
258  *                              v                                    .
259  *                            done . . . . . . . . . . . . . . . . . .
260  *                              ^
261  *                              |
262  *                     RBD_OBJ_WRITE_FLAT
263  *
264  * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
265  * assert_exists guard is needed or not (in some cases it's not needed
266  * even if there is a parent).
267  */
268 enum rbd_obj_write_state {
269         RBD_OBJ_WRITE_START = 1,
270         RBD_OBJ_WRITE_PRE_OBJECT_MAP,
271         RBD_OBJ_WRITE_OBJECT,
272         __RBD_OBJ_WRITE_COPYUP,
273         RBD_OBJ_WRITE_COPYUP,
274         RBD_OBJ_WRITE_POST_OBJECT_MAP,
275 };
276
277 enum rbd_obj_copyup_state {
278         RBD_OBJ_COPYUP_START = 1,
279         RBD_OBJ_COPYUP_READ_PARENT,
280         __RBD_OBJ_COPYUP_OBJECT_MAPS,
281         RBD_OBJ_COPYUP_OBJECT_MAPS,
282         __RBD_OBJ_COPYUP_WRITE_OBJECT,
283         RBD_OBJ_COPYUP_WRITE_OBJECT,
284 };
285
286 struct rbd_obj_request {
287         struct ceph_object_extent ex;
288         unsigned int            flags;  /* RBD_OBJ_FLAG_* */
289         union {
290                 enum rbd_obj_read_state  read_state;    /* for reads */
291                 enum rbd_obj_write_state write_state;   /* for writes */
292         };
293
294         struct rbd_img_request  *img_request;
295         struct ceph_file_extent *img_extents;
296         u32                     num_img_extents;
297
298         union {
299                 struct ceph_bio_iter    bio_pos;
300                 struct {
301                         struct ceph_bvec_iter   bvec_pos;
302                         u32                     bvec_count;
303                         u32                     bvec_idx;
304                 };
305         };
306
307         enum rbd_obj_copyup_state copyup_state;
308         struct bio_vec          *copyup_bvecs;
309         u32                     copyup_bvec_count;
310
311         struct list_head        osd_reqs;       /* w/ r_private_item */
312
313         struct mutex            state_mutex;
314         struct pending_result   pending;
315         struct kref             kref;
316 };
317
318 enum img_req_flags {
319         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
320         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
321 };
322
323 enum rbd_img_state {
324         RBD_IMG_START = 1,
325         RBD_IMG_EXCLUSIVE_LOCK,
326         __RBD_IMG_OBJECT_REQUESTS,
327         RBD_IMG_OBJECT_REQUESTS,
328 };
329
330 struct rbd_img_request {
331         struct rbd_device       *rbd_dev;
332         enum obj_operation_type op_type;
333         enum obj_request_type   data_type;
334         unsigned long           flags;
335         enum rbd_img_state      state;
336         union {
337                 u64                     snap_id;        /* for reads */
338                 struct ceph_snap_context *snapc;        /* for writes */
339         };
340         union {
341                 struct request          *rq;            /* block request */
342                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
343         };
344
345         struct list_head        lock_item;
346         struct list_head        object_extents; /* obj_req.ex structs */
347
348         struct mutex            state_mutex;
349         struct pending_result   pending;
350         struct work_struct      work;
351         int                     work_result;
352         struct kref             kref;
353 };
354
355 #define for_each_obj_request(ireq, oreq) \
356         list_for_each_entry(oreq, &(ireq)->object_extents, ex.oe_item)
357 #define for_each_obj_request_safe(ireq, oreq, n) \
358         list_for_each_entry_safe(oreq, n, &(ireq)->object_extents, ex.oe_item)
359
360 enum rbd_watch_state {
361         RBD_WATCH_STATE_UNREGISTERED,
362         RBD_WATCH_STATE_REGISTERED,
363         RBD_WATCH_STATE_ERROR,
364 };
365
366 enum rbd_lock_state {
367         RBD_LOCK_STATE_UNLOCKED,
368         RBD_LOCK_STATE_LOCKED,
369         RBD_LOCK_STATE_RELEASING,
370 };
371
372 /* WatchNotify::ClientId */
373 struct rbd_client_id {
374         u64 gid;
375         u64 handle;
376 };
377
378 struct rbd_mapping {
379         u64                     size;
380         u64                     features;
381 };
382
383 /*
384  * a single device
385  */
386 struct rbd_device {
387         int                     dev_id;         /* blkdev unique id */
388
389         int                     major;          /* blkdev assigned major */
390         int                     minor;
391         struct gendisk          *disk;          /* blkdev's gendisk and rq */
392
393         u32                     image_format;   /* Either 1 or 2 */
394         struct rbd_client       *rbd_client;
395
396         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
397
398         spinlock_t              lock;           /* queue, flags, open_count */
399
400         struct rbd_image_header header;
401         unsigned long           flags;          /* possibly lock protected */
402         struct rbd_spec         *spec;
403         struct rbd_options      *opts;
404         char                    *config_info;   /* add{,_single_major} string */
405
406         struct ceph_object_id   header_oid;
407         struct ceph_object_locator header_oloc;
408
409         struct ceph_file_layout layout;         /* used for all rbd requests */
410
411         struct mutex            watch_mutex;
412         enum rbd_watch_state    watch_state;
413         struct ceph_osd_linger_request *watch_handle;
414         u64                     watch_cookie;
415         struct delayed_work     watch_dwork;
416
417         struct rw_semaphore     lock_rwsem;
418         enum rbd_lock_state     lock_state;
419         char                    lock_cookie[32];
420         struct rbd_client_id    owner_cid;
421         struct work_struct      acquired_lock_work;
422         struct work_struct      released_lock_work;
423         struct delayed_work     lock_dwork;
424         struct work_struct      unlock_work;
425         spinlock_t              lock_lists_lock;
426         struct list_head        acquiring_list;
427         struct list_head        running_list;
428         struct completion       acquire_wait;
429         int                     acquire_err;
430         struct completion       releasing_wait;
431
432         spinlock_t              object_map_lock;
433         u8                      *object_map;
434         u64                     object_map_size;        /* in objects */
435         u64                     object_map_flags;
436
437         struct workqueue_struct *task_wq;
438
439         struct rbd_spec         *parent_spec;
440         u64                     parent_overlap;
441         atomic_t                parent_ref;
442         struct rbd_device       *parent;
443
444         /* Block layer tags. */
445         struct blk_mq_tag_set   tag_set;
446
447         /* protects updating the header */
448         struct rw_semaphore     header_rwsem;
449
450         struct rbd_mapping      mapping;
451
452         struct list_head        node;
453
454         /* sysfs related */
455         struct device           dev;
456         unsigned long           open_count;     /* protected by lock */
457 };
458
459 /*
460  * Flag bits for rbd_dev->flags:
461  * - REMOVING (which is coupled with rbd_dev->open_count) is protected
462  *   by rbd_dev->lock
463  */
464 enum rbd_dev_flags {
465         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
466         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
467         RBD_DEV_FLAG_READONLY,  /* -o ro or snapshot */
468 };
469
470 static DEFINE_MUTEX(client_mutex);      /* Serialize client creation */
471
472 static LIST_HEAD(rbd_dev_list);    /* devices */
473 static DEFINE_SPINLOCK(rbd_dev_list_lock);
474
475 static LIST_HEAD(rbd_client_list);              /* clients */
476 static DEFINE_SPINLOCK(rbd_client_list_lock);
477
478 /* Slab caches for frequently-allocated structures */
479
480 static struct kmem_cache        *rbd_img_request_cache;
481 static struct kmem_cache        *rbd_obj_request_cache;
482
483 static int rbd_major;
484 static DEFINE_IDA(rbd_dev_id_ida);
485
486 static struct workqueue_struct *rbd_wq;
487
488 static struct ceph_snap_context rbd_empty_snapc = {
489         .nref = REFCOUNT_INIT(1),
490 };
491
492 /*
493  * single-major requires >= 0.75 version of userspace rbd utility.
494  */
495 static bool single_major = true;
496 module_param(single_major, bool, 0444);
497 MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
498
499 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count);
500 static ssize_t remove_store(struct bus_type *bus, const char *buf,
501                             size_t count);
502 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
503                                       size_t count);
504 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
505                                          size_t count);
506 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
507
508 static int rbd_dev_id_to_minor(int dev_id)
509 {
510         return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
511 }
512
513 static int minor_to_rbd_dev_id(int minor)
514 {
515         return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
516 }
517
518 static bool rbd_is_ro(struct rbd_device *rbd_dev)
519 {
520         return test_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
521 }
522
523 static bool rbd_is_snap(struct rbd_device *rbd_dev)
524 {
525         return rbd_dev->spec->snap_id != CEPH_NOSNAP;
526 }
527
528 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
529 {
530         lockdep_assert_held(&rbd_dev->lock_rwsem);
531
532         return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
533                rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
534 }
535
536 static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
537 {
538         bool is_lock_owner;
539
540         down_read(&rbd_dev->lock_rwsem);
541         is_lock_owner = __rbd_is_lock_owner(rbd_dev);
542         up_read(&rbd_dev->lock_rwsem);
543         return is_lock_owner;
544 }
545
546 static ssize_t supported_features_show(struct bus_type *bus, char *buf)
547 {
548         return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
549 }
550
551 static BUS_ATTR_WO(add);
552 static BUS_ATTR_WO(remove);
553 static BUS_ATTR_WO(add_single_major);
554 static BUS_ATTR_WO(remove_single_major);
555 static BUS_ATTR_RO(supported_features);
556
557 static struct attribute *rbd_bus_attrs[] = {
558         &bus_attr_add.attr,
559         &bus_attr_remove.attr,
560         &bus_attr_add_single_major.attr,
561         &bus_attr_remove_single_major.attr,
562         &bus_attr_supported_features.attr,
563         NULL,
564 };
565
566 static umode_t rbd_bus_is_visible(struct kobject *kobj,
567                                   struct attribute *attr, int index)
568 {
569         if (!single_major &&
570             (attr == &bus_attr_add_single_major.attr ||
571              attr == &bus_attr_remove_single_major.attr))
572                 return 0;
573
574         return attr->mode;
575 }
576
577 static const struct attribute_group rbd_bus_group = {
578         .attrs = rbd_bus_attrs,
579         .is_visible = rbd_bus_is_visible,
580 };
581 __ATTRIBUTE_GROUPS(rbd_bus);
582
583 static struct bus_type rbd_bus_type = {
584         .name           = "rbd",
585         .bus_groups     = rbd_bus_groups,
586 };
587
588 static void rbd_root_dev_release(struct device *dev)
589 {
590 }
591
592 static struct device rbd_root_dev = {
593         .init_name =    "rbd",
594         .release =      rbd_root_dev_release,
595 };
596
597 static __printf(2, 3)
598 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
599 {
600         struct va_format vaf;
601         va_list args;
602
603         va_start(args, fmt);
604         vaf.fmt = fmt;
605         vaf.va = &args;
606
607         if (!rbd_dev)
608                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
609         else if (rbd_dev->disk)
610                 printk(KERN_WARNING "%s: %s: %pV\n",
611                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
612         else if (rbd_dev->spec && rbd_dev->spec->image_name)
613                 printk(KERN_WARNING "%s: image %s: %pV\n",
614                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
615         else if (rbd_dev->spec && rbd_dev->spec->image_id)
616                 printk(KERN_WARNING "%s: id %s: %pV\n",
617                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
618         else    /* punt */
619                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
620                         RBD_DRV_NAME, rbd_dev, &vaf);
621         va_end(args);
622 }
623
624 #ifdef RBD_DEBUG
625 #define rbd_assert(expr)                                                \
626                 if (unlikely(!(expr))) {                                \
627                         printk(KERN_ERR "\nAssertion failure in %s() "  \
628                                                 "at line %d:\n\n"       \
629                                         "\trbd_assert(%s);\n\n",        \
630                                         __func__, __LINE__, #expr);     \
631                         BUG();                                          \
632                 }
633 #else /* !RBD_DEBUG */
634 #  define rbd_assert(expr)      ((void) 0)
635 #endif /* !RBD_DEBUG */
636
637 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
638
639 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
640 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
641 static int rbd_dev_header_info(struct rbd_device *rbd_dev);
642 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
643 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
644                                         u64 snap_id);
645 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
646                                 u8 *order, u64 *snap_size);
647 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
648                 u64 *snap_features);
649 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev);
650
651 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result);
652 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result);
653
654 /*
655  * Return true if nothing else is pending.
656  */
657 static bool pending_result_dec(struct pending_result *pending, int *result)
658 {
659         rbd_assert(pending->num_pending > 0);
660
661         if (*result && !pending->result)
662                 pending->result = *result;
663         if (--pending->num_pending)
664                 return false;
665
666         *result = pending->result;
667         return true;
668 }
669
670 static int rbd_open(struct block_device *bdev, fmode_t mode)
671 {
672         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
673         bool removing = false;
674
675         spin_lock_irq(&rbd_dev->lock);
676         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
677                 removing = true;
678         else
679                 rbd_dev->open_count++;
680         spin_unlock_irq(&rbd_dev->lock);
681         if (removing)
682                 return -ENOENT;
683
684         (void) get_device(&rbd_dev->dev);
685
686         return 0;
687 }
688
689 static void rbd_release(struct gendisk *disk, fmode_t mode)
690 {
691         struct rbd_device *rbd_dev = disk->private_data;
692         unsigned long open_count_before;
693
694         spin_lock_irq(&rbd_dev->lock);
695         open_count_before = rbd_dev->open_count--;
696         spin_unlock_irq(&rbd_dev->lock);
697         rbd_assert(open_count_before > 0);
698
699         put_device(&rbd_dev->dev);
700 }
701
702 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
703 {
704         int ro;
705
706         if (get_user(ro, (int __user *)arg))
707                 return -EFAULT;
708
709         /* Snapshots can't be marked read-write */
710         if (rbd_is_snap(rbd_dev) && !ro)
711                 return -EROFS;
712
713         /* Let blkdev_roset() handle it */
714         return -ENOTTY;
715 }
716
717 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
718                         unsigned int cmd, unsigned long arg)
719 {
720         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
721         int ret;
722
723         switch (cmd) {
724         case BLKROSET:
725                 ret = rbd_ioctl_set_ro(rbd_dev, arg);
726                 break;
727         default:
728                 ret = -ENOTTY;
729         }
730
731         return ret;
732 }
733
734 #ifdef CONFIG_COMPAT
735 static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
736                                 unsigned int cmd, unsigned long arg)
737 {
738         return rbd_ioctl(bdev, mode, cmd, arg);
739 }
740 #endif /* CONFIG_COMPAT */
741
742 static const struct block_device_operations rbd_bd_ops = {
743         .owner                  = THIS_MODULE,
744         .open                   = rbd_open,
745         .release                = rbd_release,
746         .ioctl                  = rbd_ioctl,
747 #ifdef CONFIG_COMPAT
748         .compat_ioctl           = rbd_compat_ioctl,
749 #endif
750 };
751
752 /*
753  * Initialize an rbd client instance.  Success or not, this function
754  * consumes ceph_opts.  Caller holds client_mutex.
755  */
756 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
757 {
758         struct rbd_client *rbdc;
759         int ret = -ENOMEM;
760
761         dout("%s:\n", __func__);
762         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
763         if (!rbdc)
764                 goto out_opt;
765
766         kref_init(&rbdc->kref);
767         INIT_LIST_HEAD(&rbdc->node);
768
769         rbdc->client = ceph_create_client(ceph_opts, rbdc);
770         if (IS_ERR(rbdc->client))
771                 goto out_rbdc;
772         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
773
774         ret = ceph_open_session(rbdc->client);
775         if (ret < 0)
776                 goto out_client;
777
778         spin_lock(&rbd_client_list_lock);
779         list_add_tail(&rbdc->node, &rbd_client_list);
780         spin_unlock(&rbd_client_list_lock);
781
782         dout("%s: rbdc %p\n", __func__, rbdc);
783
784         return rbdc;
785 out_client:
786         ceph_destroy_client(rbdc->client);
787 out_rbdc:
788         kfree(rbdc);
789 out_opt:
790         if (ceph_opts)
791                 ceph_destroy_options(ceph_opts);
792         dout("%s: error %d\n", __func__, ret);
793
794         return ERR_PTR(ret);
795 }
796
797 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
798 {
799         kref_get(&rbdc->kref);
800
801         return rbdc;
802 }
803
804 /*
805  * Find a ceph client with specific addr and configuration.  If
806  * found, bump its reference count.
807  */
808 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
809 {
810         struct rbd_client *client_node;
811         bool found = false;
812
813         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
814                 return NULL;
815
816         spin_lock(&rbd_client_list_lock);
817         list_for_each_entry(client_node, &rbd_client_list, node) {
818                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
819                         __rbd_get_client(client_node);
820
821                         found = true;
822                         break;
823                 }
824         }
825         spin_unlock(&rbd_client_list_lock);
826
827         return found ? client_node : NULL;
828 }
829
830 /*
831  * (Per device) rbd map options
832  */
833 enum {
834         Opt_queue_depth,
835         Opt_alloc_size,
836         Opt_lock_timeout,
837         Opt_last_int,
838         /* int args above */
839         Opt_pool_ns,
840         Opt_last_string,
841         /* string args above */
842         Opt_read_only,
843         Opt_read_write,
844         Opt_lock_on_read,
845         Opt_exclusive,
846         Opt_notrim,
847         Opt_err
848 };
849
850 static match_table_t rbd_opts_tokens = {
851         {Opt_queue_depth, "queue_depth=%d"},
852         {Opt_alloc_size, "alloc_size=%d"},
853         {Opt_lock_timeout, "lock_timeout=%d"},
854         /* int args above */
855         {Opt_pool_ns, "_pool_ns=%s"},
856         /* string args above */
857         {Opt_read_only, "read_only"},
858         {Opt_read_only, "ro"},          /* Alternate spelling */
859         {Opt_read_write, "read_write"},
860         {Opt_read_write, "rw"},         /* Alternate spelling */
861         {Opt_lock_on_read, "lock_on_read"},
862         {Opt_exclusive, "exclusive"},
863         {Opt_notrim, "notrim"},
864         {Opt_err, NULL}
865 };
866
867 struct rbd_options {
868         int     queue_depth;
869         int     alloc_size;
870         unsigned long   lock_timeout;
871         bool    read_only;
872         bool    lock_on_read;
873         bool    exclusive;
874         bool    trim;
875 };
876
877 #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
878 #define RBD_ALLOC_SIZE_DEFAULT  (64 * 1024)
879 #define RBD_LOCK_TIMEOUT_DEFAULT 0  /* no timeout */
880 #define RBD_READ_ONLY_DEFAULT   false
881 #define RBD_LOCK_ON_READ_DEFAULT false
882 #define RBD_EXCLUSIVE_DEFAULT   false
883 #define RBD_TRIM_DEFAULT        true
884
885 struct parse_rbd_opts_ctx {
886         struct rbd_spec         *spec;
887         struct rbd_options      *opts;
888 };
889
890 static int parse_rbd_opts_token(char *c, void *private)
891 {
892         struct parse_rbd_opts_ctx *pctx = private;
893         substring_t argstr[MAX_OPT_ARGS];
894         int token, intval, ret;
895
896         token = match_token(c, rbd_opts_tokens, argstr);
897         if (token < Opt_last_int) {
898                 ret = match_int(&argstr[0], &intval);
899                 if (ret < 0) {
900                         pr_err("bad option arg (not int) at '%s'\n", c);
901                         return ret;
902                 }
903                 dout("got int token %d val %d\n", token, intval);
904         } else if (token > Opt_last_int && token < Opt_last_string) {
905                 dout("got string token %d val %s\n", token, argstr[0].from);
906         } else {
907                 dout("got token %d\n", token);
908         }
909
910         switch (token) {
911         case Opt_queue_depth:
912                 if (intval < 1) {
913                         pr_err("queue_depth out of range\n");
914                         return -EINVAL;
915                 }
916                 pctx->opts->queue_depth = intval;
917                 break;
918         case Opt_alloc_size:
919                 if (intval < SECTOR_SIZE) {
920                         pr_err("alloc_size out of range\n");
921                         return -EINVAL;
922                 }
923                 if (!is_power_of_2(intval)) {
924                         pr_err("alloc_size must be a power of 2\n");
925                         return -EINVAL;
926                 }
927                 pctx->opts->alloc_size = intval;
928                 break;
929         case Opt_lock_timeout:
930                 /* 0 is "wait forever" (i.e. infinite timeout) */
931                 if (intval < 0 || intval > INT_MAX / 1000) {
932                         pr_err("lock_timeout out of range\n");
933                         return -EINVAL;
934                 }
935                 pctx->opts->lock_timeout = msecs_to_jiffies(intval * 1000);
936                 break;
937         case Opt_pool_ns:
938                 kfree(pctx->spec->pool_ns);
939                 pctx->spec->pool_ns = match_strdup(argstr);
940                 if (!pctx->spec->pool_ns)
941                         return -ENOMEM;
942                 break;
943         case Opt_read_only:
944                 pctx->opts->read_only = true;
945                 break;
946         case Opt_read_write:
947                 pctx->opts->read_only = false;
948                 break;
949         case Opt_lock_on_read:
950                 pctx->opts->lock_on_read = true;
951                 break;
952         case Opt_exclusive:
953                 pctx->opts->exclusive = true;
954                 break;
955         case Opt_notrim:
956                 pctx->opts->trim = false;
957                 break;
958         default:
959                 /* libceph prints "bad option" msg */
960                 return -EINVAL;
961         }
962
963         return 0;
964 }
965
966 static char* obj_op_name(enum obj_operation_type op_type)
967 {
968         switch (op_type) {
969         case OBJ_OP_READ:
970                 return "read";
971         case OBJ_OP_WRITE:
972                 return "write";
973         case OBJ_OP_DISCARD:
974                 return "discard";
975         case OBJ_OP_ZEROOUT:
976                 return "zeroout";
977         default:
978                 return "???";
979         }
980 }
981
982 /*
983  * Destroy ceph client
984  *
985  * Caller must hold rbd_client_list_lock.
986  */
987 static void rbd_client_release(struct kref *kref)
988 {
989         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
990
991         dout("%s: rbdc %p\n", __func__, rbdc);
992         spin_lock(&rbd_client_list_lock);
993         list_del(&rbdc->node);
994         spin_unlock(&rbd_client_list_lock);
995
996         ceph_destroy_client(rbdc->client);
997         kfree(rbdc);
998 }
999
1000 /*
1001  * Drop reference to ceph client node. If it's not referenced anymore, release
1002  * it.
1003  */
1004 static void rbd_put_client(struct rbd_client *rbdc)
1005 {
1006         if (rbdc)
1007                 kref_put(&rbdc->kref, rbd_client_release);
1008 }
1009
1010 /*
1011  * Get a ceph client with specific addr and configuration, if one does
1012  * not exist create it.  Either way, ceph_opts is consumed by this
1013  * function.
1014  */
1015 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
1016 {
1017         struct rbd_client *rbdc;
1018         int ret;
1019
1020         mutex_lock(&client_mutex);
1021         rbdc = rbd_client_find(ceph_opts);
1022         if (rbdc) {
1023                 ceph_destroy_options(ceph_opts);
1024
1025                 /*
1026                  * Using an existing client.  Make sure ->pg_pools is up to
1027                  * date before we look up the pool id in do_rbd_add().
1028                  */
1029                 ret = ceph_wait_for_latest_osdmap(rbdc->client,
1030                                         rbdc->client->options->mount_timeout);
1031                 if (ret) {
1032                         rbd_warn(NULL, "failed to get latest osdmap: %d", ret);
1033                         rbd_put_client(rbdc);
1034                         rbdc = ERR_PTR(ret);
1035                 }
1036         } else {
1037                 rbdc = rbd_client_create(ceph_opts);
1038         }
1039         mutex_unlock(&client_mutex);
1040
1041         return rbdc;
1042 }
1043
1044 static bool rbd_image_format_valid(u32 image_format)
1045 {
1046         return image_format == 1 || image_format == 2;
1047 }
1048
1049 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
1050 {
1051         size_t size;
1052         u32 snap_count;
1053
1054         /* The header has to start with the magic rbd header text */
1055         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
1056                 return false;
1057
1058         /* The bio layer requires at least sector-sized I/O */
1059
1060         if (ondisk->options.order < SECTOR_SHIFT)
1061                 return false;
1062
1063         /* If we use u64 in a few spots we may be able to loosen this */
1064
1065         if (ondisk->options.order > 8 * sizeof (int) - 1)
1066                 return false;
1067
1068         /*
1069          * The size of a snapshot header has to fit in a size_t, and
1070          * that limits the number of snapshots.
1071          */
1072         snap_count = le32_to_cpu(ondisk->snap_count);
1073         size = SIZE_MAX - sizeof (struct ceph_snap_context);
1074         if (snap_count > size / sizeof (__le64))
1075                 return false;
1076
1077         /*
1078          * Not only that, but the size of the entire the snapshot
1079          * header must also be representable in a size_t.
1080          */
1081         size -= snap_count * sizeof (__le64);
1082         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
1083                 return false;
1084
1085         return true;
1086 }
1087
1088 /*
1089  * returns the size of an object in the image
1090  */
1091 static u32 rbd_obj_bytes(struct rbd_image_header *header)
1092 {
1093         return 1U << header->obj_order;
1094 }
1095
1096 static void rbd_init_layout(struct rbd_device *rbd_dev)
1097 {
1098         if (rbd_dev->header.stripe_unit == 0 ||
1099             rbd_dev->header.stripe_count == 0) {
1100                 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
1101                 rbd_dev->header.stripe_count = 1;
1102         }
1103
1104         rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
1105         rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
1106         rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
1107         rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
1108                           rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
1109         RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1110 }
1111
1112 /*
1113  * Fill an rbd image header with information from the given format 1
1114  * on-disk header.
1115  */
1116 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
1117                                  struct rbd_image_header_ondisk *ondisk)
1118 {
1119         struct rbd_image_header *header = &rbd_dev->header;
1120         bool first_time = header->object_prefix == NULL;
1121         struct ceph_snap_context *snapc;
1122         char *object_prefix = NULL;
1123         char *snap_names = NULL;
1124         u64 *snap_sizes = NULL;
1125         u32 snap_count;
1126         int ret = -ENOMEM;
1127         u32 i;
1128
1129         /* Allocate this now to avoid having to handle failure below */
1130
1131         if (first_time) {
1132                 object_prefix = kstrndup(ondisk->object_prefix,
1133                                          sizeof(ondisk->object_prefix),
1134                                          GFP_KERNEL);
1135                 if (!object_prefix)
1136                         return -ENOMEM;
1137         }
1138
1139         /* Allocate the snapshot context and fill it in */
1140
1141         snap_count = le32_to_cpu(ondisk->snap_count);
1142         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1143         if (!snapc)
1144                 goto out_err;
1145         snapc->seq = le64_to_cpu(ondisk->snap_seq);
1146         if (snap_count) {
1147                 struct rbd_image_snap_ondisk *snaps;
1148                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1149
1150                 /* We'll keep a copy of the snapshot names... */
1151
1152                 if (snap_names_len > (u64)SIZE_MAX)
1153                         goto out_2big;
1154                 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1155                 if (!snap_names)
1156                         goto out_err;
1157
1158                 /* ...as well as the array of their sizes. */
1159                 snap_sizes = kmalloc_array(snap_count,
1160                                            sizeof(*header->snap_sizes),
1161                                            GFP_KERNEL);
1162                 if (!snap_sizes)
1163                         goto out_err;
1164
1165                 /*
1166                  * Copy the names, and fill in each snapshot's id
1167                  * and size.
1168                  *
1169                  * Note that rbd_dev_v1_header_info() guarantees the
1170                  * ondisk buffer we're working with has
1171                  * snap_names_len bytes beyond the end of the
1172                  * snapshot id array, this memcpy() is safe.
1173                  */
1174                 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1175                 snaps = ondisk->snaps;
1176                 for (i = 0; i < snap_count; i++) {
1177                         snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1178                         snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1179                 }
1180         }
1181
1182         /* We won't fail any more, fill in the header */
1183
1184         if (first_time) {
1185                 header->object_prefix = object_prefix;
1186                 header->obj_order = ondisk->options.order;
1187                 rbd_init_layout(rbd_dev);
1188         } else {
1189                 ceph_put_snap_context(header->snapc);
1190                 kfree(header->snap_names);
1191                 kfree(header->snap_sizes);
1192         }
1193
1194         /* The remaining fields always get updated (when we refresh) */
1195
1196         header->image_size = le64_to_cpu(ondisk->image_size);
1197         header->snapc = snapc;
1198         header->snap_names = snap_names;
1199         header->snap_sizes = snap_sizes;
1200
1201         return 0;
1202 out_2big:
1203         ret = -EIO;
1204 out_err:
1205         kfree(snap_sizes);
1206         kfree(snap_names);
1207         ceph_put_snap_context(snapc);
1208         kfree(object_prefix);
1209
1210         return ret;
1211 }
1212
1213 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1214 {
1215         const char *snap_name;
1216
1217         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1218
1219         /* Skip over names until we find the one we are looking for */
1220
1221         snap_name = rbd_dev->header.snap_names;
1222         while (which--)
1223                 snap_name += strlen(snap_name) + 1;
1224
1225         return kstrdup(snap_name, GFP_KERNEL);
1226 }
1227
1228 /*
1229  * Snapshot id comparison function for use with qsort()/bsearch().
1230  * Note that result is for snapshots in *descending* order.
1231  */
1232 static int snapid_compare_reverse(const void *s1, const void *s2)
1233 {
1234         u64 snap_id1 = *(u64 *)s1;
1235         u64 snap_id2 = *(u64 *)s2;
1236
1237         if (snap_id1 < snap_id2)
1238                 return 1;
1239         return snap_id1 == snap_id2 ? 0 : -1;
1240 }
1241
1242 /*
1243  * Search a snapshot context to see if the given snapshot id is
1244  * present.
1245  *
1246  * Returns the position of the snapshot id in the array if it's found,
1247  * or BAD_SNAP_INDEX otherwise.
1248  *
1249  * Note: The snapshot array is in kept sorted (by the osd) in
1250  * reverse order, highest snapshot id first.
1251  */
1252 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1253 {
1254         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1255         u64 *found;
1256
1257         found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1258                                 sizeof (snap_id), snapid_compare_reverse);
1259
1260         return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1261 }
1262
1263 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1264                                         u64 snap_id)
1265 {
1266         u32 which;
1267         const char *snap_name;
1268
1269         which = rbd_dev_snap_index(rbd_dev, snap_id);
1270         if (which == BAD_SNAP_INDEX)
1271                 return ERR_PTR(-ENOENT);
1272
1273         snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1274         return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1275 }
1276
1277 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1278 {
1279         if (snap_id == CEPH_NOSNAP)
1280                 return RBD_SNAP_HEAD_NAME;
1281
1282         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1283         if (rbd_dev->image_format == 1)
1284                 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1285
1286         return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1287 }
1288
1289 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1290                                 u64 *snap_size)
1291 {
1292         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1293         if (snap_id == CEPH_NOSNAP) {
1294                 *snap_size = rbd_dev->header.image_size;
1295         } else if (rbd_dev->image_format == 1) {
1296                 u32 which;
1297
1298                 which = rbd_dev_snap_index(rbd_dev, snap_id);
1299                 if (which == BAD_SNAP_INDEX)
1300                         return -ENOENT;
1301
1302                 *snap_size = rbd_dev->header.snap_sizes[which];
1303         } else {
1304                 u64 size = 0;
1305                 int ret;
1306
1307                 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1308                 if (ret)
1309                         return ret;
1310
1311                 *snap_size = size;
1312         }
1313         return 0;
1314 }
1315
1316 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1317                         u64 *snap_features)
1318 {
1319         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1320         if (snap_id == CEPH_NOSNAP) {
1321                 *snap_features = rbd_dev->header.features;
1322         } else if (rbd_dev->image_format == 1) {
1323                 *snap_features = 0;     /* No features for format 1 */
1324         } else {
1325                 u64 features = 0;
1326                 int ret;
1327
1328                 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1329                 if (ret)
1330                         return ret;
1331
1332                 *snap_features = features;
1333         }
1334         return 0;
1335 }
1336
1337 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1338 {
1339         u64 snap_id = rbd_dev->spec->snap_id;
1340         u64 size = 0;
1341         u64 features = 0;
1342         int ret;
1343
1344         ret = rbd_snap_size(rbd_dev, snap_id, &size);
1345         if (ret)
1346                 return ret;
1347         ret = rbd_snap_features(rbd_dev, snap_id, &features);
1348         if (ret)
1349                 return ret;
1350
1351         rbd_dev->mapping.size = size;
1352         rbd_dev->mapping.features = features;
1353
1354         return 0;
1355 }
1356
1357 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1358 {
1359         rbd_dev->mapping.size = 0;
1360         rbd_dev->mapping.features = 0;
1361 }
1362
1363 static void zero_bvec(struct bio_vec *bv)
1364 {
1365         void *buf;
1366         unsigned long flags;
1367
1368         buf = bvec_kmap_irq(bv, &flags);
1369         memset(buf, 0, bv->bv_len);
1370         flush_dcache_page(bv->bv_page);
1371         bvec_kunmap_irq(buf, &flags);
1372 }
1373
1374 static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
1375 {
1376         struct ceph_bio_iter it = *bio_pos;
1377
1378         ceph_bio_iter_advance(&it, off);
1379         ceph_bio_iter_advance_step(&it, bytes, ({
1380                 zero_bvec(&bv);
1381         }));
1382 }
1383
1384 static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
1385 {
1386         struct ceph_bvec_iter it = *bvec_pos;
1387
1388         ceph_bvec_iter_advance(&it, off);
1389         ceph_bvec_iter_advance_step(&it, bytes, ({
1390                 zero_bvec(&bv);
1391         }));
1392 }
1393
1394 /*
1395  * Zero a range in @obj_req data buffer defined by a bio (list) or
1396  * (private) bio_vec array.
1397  *
1398  * @off is relative to the start of the data buffer.
1399  */
1400 static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
1401                                u32 bytes)
1402 {
1403         dout("%s %p data buf %u~%u\n", __func__, obj_req, off, bytes);
1404
1405         switch (obj_req->img_request->data_type) {
1406         case OBJ_REQUEST_BIO:
1407                 zero_bios(&obj_req->bio_pos, off, bytes);
1408                 break;
1409         case OBJ_REQUEST_BVECS:
1410         case OBJ_REQUEST_OWN_BVECS:
1411                 zero_bvecs(&obj_req->bvec_pos, off, bytes);
1412                 break;
1413         default:
1414                 BUG();
1415         }
1416 }
1417
1418 static void rbd_obj_request_destroy(struct kref *kref);
1419 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1420 {
1421         rbd_assert(obj_request != NULL);
1422         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1423                 kref_read(&obj_request->kref));
1424         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1425 }
1426
1427 static void rbd_img_request_destroy(struct kref *kref);
1428 static void rbd_img_request_put(struct rbd_img_request *img_request)
1429 {
1430         rbd_assert(img_request != NULL);
1431         dout("%s: img %p (was %d)\n", __func__, img_request,
1432                 kref_read(&img_request->kref));
1433         kref_put(&img_request->kref, rbd_img_request_destroy);
1434 }
1435
1436 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1437                                         struct rbd_obj_request *obj_request)
1438 {
1439         rbd_assert(obj_request->img_request == NULL);
1440
1441         /* Image request now owns object's original reference */
1442         obj_request->img_request = img_request;
1443         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1444 }
1445
1446 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1447                                         struct rbd_obj_request *obj_request)
1448 {
1449         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1450         list_del(&obj_request->ex.oe_item);
1451         rbd_assert(obj_request->img_request == img_request);
1452         rbd_obj_request_put(obj_request);
1453 }
1454
1455 static void rbd_osd_submit(struct ceph_osd_request *osd_req)
1456 {
1457         struct rbd_obj_request *obj_req = osd_req->r_priv;
1458
1459         dout("%s osd_req %p for obj_req %p objno %llu %llu~%llu\n",
1460              __func__, osd_req, obj_req, obj_req->ex.oe_objno,
1461              obj_req->ex.oe_off, obj_req->ex.oe_len);
1462         ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
1463 }
1464
1465 /*
1466  * The default/initial value for all image request flags is 0.  Each
1467  * is conditionally set to 1 at image request initialization time
1468  * and currently never change thereafter.
1469  */
1470 static void img_request_layered_set(struct rbd_img_request *img_request)
1471 {
1472         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1473         smp_mb();
1474 }
1475
1476 static void img_request_layered_clear(struct rbd_img_request *img_request)
1477 {
1478         clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1479         smp_mb();
1480 }
1481
1482 static bool img_request_layered_test(struct rbd_img_request *img_request)
1483 {
1484         smp_mb();
1485         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1486 }
1487
1488 static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
1489 {
1490         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1491
1492         return !obj_req->ex.oe_off &&
1493                obj_req->ex.oe_len == rbd_dev->layout.object_size;
1494 }
1495
1496 static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
1497 {
1498         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1499
1500         return obj_req->ex.oe_off + obj_req->ex.oe_len ==
1501                                         rbd_dev->layout.object_size;
1502 }
1503
1504 /*
1505  * Must be called after rbd_obj_calc_img_extents().
1506  */
1507 static bool rbd_obj_copyup_enabled(struct rbd_obj_request *obj_req)
1508 {
1509         if (!obj_req->num_img_extents ||
1510             (rbd_obj_is_entire(obj_req) &&
1511              !obj_req->img_request->snapc->num_snaps))
1512                 return false;
1513
1514         return true;
1515 }
1516
1517 static u64 rbd_obj_img_extents_bytes(struct rbd_obj_request *obj_req)
1518 {
1519         return ceph_file_extents_bytes(obj_req->img_extents,
1520                                        obj_req->num_img_extents);
1521 }
1522
1523 static bool rbd_img_is_write(struct rbd_img_request *img_req)
1524 {
1525         switch (img_req->op_type) {
1526         case OBJ_OP_READ:
1527                 return false;
1528         case OBJ_OP_WRITE:
1529         case OBJ_OP_DISCARD:
1530         case OBJ_OP_ZEROOUT:
1531                 return true;
1532         default:
1533                 BUG();
1534         }
1535 }
1536
1537 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
1538 {
1539         struct rbd_obj_request *obj_req = osd_req->r_priv;
1540         int result;
1541
1542         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
1543              osd_req->r_result, obj_req);
1544
1545         /*
1546          * Writes aren't allowed to return a data payload.  In some
1547          * guarded write cases (e.g. stat + zero on an empty object)
1548          * a stat response makes it through, but we don't care.
1549          */
1550         if (osd_req->r_result > 0 && rbd_img_is_write(obj_req->img_request))
1551                 result = 0;
1552         else
1553                 result = osd_req->r_result;
1554
1555         rbd_obj_handle_request(obj_req, result);
1556 }
1557
1558 static void rbd_osd_format_read(struct ceph_osd_request *osd_req)
1559 {
1560         struct rbd_obj_request *obj_request = osd_req->r_priv;
1561
1562         osd_req->r_flags = CEPH_OSD_FLAG_READ;
1563         osd_req->r_snapid = obj_request->img_request->snap_id;
1564 }
1565
1566 static void rbd_osd_format_write(struct ceph_osd_request *osd_req)
1567 {
1568         struct rbd_obj_request *obj_request = osd_req->r_priv;
1569
1570         osd_req->r_flags = CEPH_OSD_FLAG_WRITE;
1571         ktime_get_real_ts64(&osd_req->r_mtime);
1572         osd_req->r_data_offset = obj_request->ex.oe_off;
1573 }
1574
1575 static struct ceph_osd_request *
1576 __rbd_obj_add_osd_request(struct rbd_obj_request *obj_req,
1577                           struct ceph_snap_context *snapc, int num_ops)
1578 {
1579         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
1580         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1581         struct ceph_osd_request *req;
1582         const char *name_format = rbd_dev->image_format == 1 ?
1583                                       RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
1584         int ret;
1585
1586         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1587         if (!req)
1588                 return ERR_PTR(-ENOMEM);
1589
1590         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
1591         req->r_callback = rbd_osd_req_callback;
1592         req->r_priv = obj_req;
1593
1594         /*
1595          * Data objects may be stored in a separate pool, but always in
1596          * the same namespace in that pool as the header in its pool.
1597          */
1598         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
1599         req->r_base_oloc.pool = rbd_dev->layout.pool_id;
1600
1601         ret = ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1602                                rbd_dev->header.object_prefix,
1603                                obj_req->ex.oe_objno);
1604         if (ret)
1605                 return ERR_PTR(ret);
1606
1607         return req;
1608 }
1609
1610 static struct ceph_osd_request *
1611 rbd_obj_add_osd_request(struct rbd_obj_request *obj_req, int num_ops)
1612 {
1613         return __rbd_obj_add_osd_request(obj_req, obj_req->img_request->snapc,
1614                                          num_ops);
1615 }
1616
1617 static struct rbd_obj_request *rbd_obj_request_create(void)
1618 {
1619         struct rbd_obj_request *obj_request;
1620
1621         obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
1622         if (!obj_request)
1623                 return NULL;
1624
1625         ceph_object_extent_init(&obj_request->ex);
1626         INIT_LIST_HEAD(&obj_request->osd_reqs);
1627         mutex_init(&obj_request->state_mutex);
1628         kref_init(&obj_request->kref);
1629
1630         dout("%s %p\n", __func__, obj_request);
1631         return obj_request;
1632 }
1633
1634 static void rbd_obj_request_destroy(struct kref *kref)
1635 {
1636         struct rbd_obj_request *obj_request;
1637         struct ceph_osd_request *osd_req;
1638         u32 i;
1639
1640         obj_request = container_of(kref, struct rbd_obj_request, kref);
1641
1642         dout("%s: obj %p\n", __func__, obj_request);
1643
1644         while (!list_empty(&obj_request->osd_reqs)) {
1645                 osd_req = list_first_entry(&obj_request->osd_reqs,
1646                                     struct ceph_osd_request, r_private_item);
1647                 list_del_init(&osd_req->r_private_item);
1648                 ceph_osdc_put_request(osd_req);
1649         }
1650
1651         switch (obj_request->img_request->data_type) {
1652         case OBJ_REQUEST_NODATA:
1653         case OBJ_REQUEST_BIO:
1654         case OBJ_REQUEST_BVECS:
1655                 break;          /* Nothing to do */
1656         case OBJ_REQUEST_OWN_BVECS:
1657                 kfree(obj_request->bvec_pos.bvecs);
1658                 break;
1659         default:
1660                 BUG();
1661         }
1662
1663         kfree(obj_request->img_extents);
1664         if (obj_request->copyup_bvecs) {
1665                 for (i = 0; i < obj_request->copyup_bvec_count; i++) {
1666                         if (obj_request->copyup_bvecs[i].bv_page)
1667                                 __free_page(obj_request->copyup_bvecs[i].bv_page);
1668                 }
1669                 kfree(obj_request->copyup_bvecs);
1670         }
1671
1672         kmem_cache_free(rbd_obj_request_cache, obj_request);
1673 }
1674
1675 /* It's OK to call this for a device with no parent */
1676
1677 static void rbd_spec_put(struct rbd_spec *spec);
1678 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1679 {
1680         rbd_dev_remove_parent(rbd_dev);
1681         rbd_spec_put(rbd_dev->parent_spec);
1682         rbd_dev->parent_spec = NULL;
1683         rbd_dev->parent_overlap = 0;
1684 }
1685
1686 /*
1687  * Parent image reference counting is used to determine when an
1688  * image's parent fields can be safely torn down--after there are no
1689  * more in-flight requests to the parent image.  When the last
1690  * reference is dropped, cleaning them up is safe.
1691  */
1692 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1693 {
1694         int counter;
1695
1696         if (!rbd_dev->parent_spec)
1697                 return;
1698
1699         counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1700         if (counter > 0)
1701                 return;
1702
1703         /* Last reference; clean up parent data structures */
1704
1705         if (!counter)
1706                 rbd_dev_unparent(rbd_dev);
1707         else
1708                 rbd_warn(rbd_dev, "parent reference underflow");
1709 }
1710
1711 /*
1712  * If an image has a non-zero parent overlap, get a reference to its
1713  * parent.
1714  *
1715  * Returns true if the rbd device has a parent with a non-zero
1716  * overlap and a reference for it was successfully taken, or
1717  * false otherwise.
1718  */
1719 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1720 {
1721         int counter = 0;
1722
1723         if (!rbd_dev->parent_spec)
1724                 return false;
1725
1726         down_read(&rbd_dev->header_rwsem);
1727         if (rbd_dev->parent_overlap)
1728                 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1729         up_read(&rbd_dev->header_rwsem);
1730
1731         if (counter < 0)
1732                 rbd_warn(rbd_dev, "parent reference overflow");
1733
1734         return counter > 0;
1735 }
1736
1737 /*
1738  * Caller is responsible for filling in the list of object requests
1739  * that comprises the image request, and the Linux request pointer
1740  * (if there is one).
1741  */
1742 static struct rbd_img_request *rbd_img_request_create(
1743                                         struct rbd_device *rbd_dev,
1744                                         enum obj_operation_type op_type,
1745                                         struct ceph_snap_context *snapc)
1746 {
1747         struct rbd_img_request *img_request;
1748
1749         img_request = kmem_cache_zalloc(rbd_img_request_cache, GFP_NOIO);
1750         if (!img_request)
1751                 return NULL;
1752
1753         img_request->rbd_dev = rbd_dev;
1754         img_request->op_type = op_type;
1755         if (!rbd_img_is_write(img_request))
1756                 img_request->snap_id = rbd_dev->spec->snap_id;
1757         else
1758                 img_request->snapc = snapc;
1759
1760         if (rbd_dev_parent_get(rbd_dev))
1761                 img_request_layered_set(img_request);
1762
1763         INIT_LIST_HEAD(&img_request->lock_item);
1764         INIT_LIST_HEAD(&img_request->object_extents);
1765         mutex_init(&img_request->state_mutex);
1766         kref_init(&img_request->kref);
1767
1768         return img_request;
1769 }
1770
1771 static void rbd_img_request_destroy(struct kref *kref)
1772 {
1773         struct rbd_img_request *img_request;
1774         struct rbd_obj_request *obj_request;
1775         struct rbd_obj_request *next_obj_request;
1776
1777         img_request = container_of(kref, struct rbd_img_request, kref);
1778
1779         dout("%s: img %p\n", __func__, img_request);
1780
1781         WARN_ON(!list_empty(&img_request->lock_item));
1782         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1783                 rbd_img_obj_request_del(img_request, obj_request);
1784
1785         if (img_request_layered_test(img_request)) {
1786                 img_request_layered_clear(img_request);
1787                 rbd_dev_parent_put(img_request->rbd_dev);
1788         }
1789
1790         if (rbd_img_is_write(img_request))
1791                 ceph_put_snap_context(img_request->snapc);
1792
1793         kmem_cache_free(rbd_img_request_cache, img_request);
1794 }
1795
1796 #define BITS_PER_OBJ    2
1797 #define OBJS_PER_BYTE   (BITS_PER_BYTE / BITS_PER_OBJ)
1798 #define OBJ_MASK        ((1 << BITS_PER_OBJ) - 1)
1799
1800 static void __rbd_object_map_index(struct rbd_device *rbd_dev, u64 objno,
1801                                    u64 *index, u8 *shift)
1802 {
1803         u32 off;
1804
1805         rbd_assert(objno < rbd_dev->object_map_size);
1806         *index = div_u64_rem(objno, OBJS_PER_BYTE, &off);
1807         *shift = (OBJS_PER_BYTE - off - 1) * BITS_PER_OBJ;
1808 }
1809
1810 static u8 __rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1811 {
1812         u64 index;
1813         u8 shift;
1814
1815         lockdep_assert_held(&rbd_dev->object_map_lock);
1816         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1817         return (rbd_dev->object_map[index] >> shift) & OBJ_MASK;
1818 }
1819
1820 static void __rbd_object_map_set(struct rbd_device *rbd_dev, u64 objno, u8 val)
1821 {
1822         u64 index;
1823         u8 shift;
1824         u8 *p;
1825
1826         lockdep_assert_held(&rbd_dev->object_map_lock);
1827         rbd_assert(!(val & ~OBJ_MASK));
1828
1829         __rbd_object_map_index(rbd_dev, objno, &index, &shift);
1830         p = &rbd_dev->object_map[index];
1831         *p = (*p & ~(OBJ_MASK << shift)) | (val << shift);
1832 }
1833
1834 static u8 rbd_object_map_get(struct rbd_device *rbd_dev, u64 objno)
1835 {
1836         u8 state;
1837
1838         spin_lock(&rbd_dev->object_map_lock);
1839         state = __rbd_object_map_get(rbd_dev, objno);
1840         spin_unlock(&rbd_dev->object_map_lock);
1841         return state;
1842 }
1843
1844 static bool use_object_map(struct rbd_device *rbd_dev)
1845 {
1846         return ((rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) &&
1847                 !(rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID));
1848 }
1849
1850 static bool rbd_object_map_may_exist(struct rbd_device *rbd_dev, u64 objno)
1851 {
1852         u8 state;
1853
1854         /* fall back to default logic if object map is disabled or invalid */
1855         if (!use_object_map(rbd_dev))
1856                 return true;
1857
1858         state = rbd_object_map_get(rbd_dev, objno);
1859         return state != OBJECT_NONEXISTENT;
1860 }
1861
1862 static void rbd_object_map_name(struct rbd_device *rbd_dev, u64 snap_id,
1863                                 struct ceph_object_id *oid)
1864 {
1865         if (snap_id == CEPH_NOSNAP)
1866                 ceph_oid_printf(oid, "%s%s", RBD_OBJECT_MAP_PREFIX,
1867                                 rbd_dev->spec->image_id);
1868         else
1869                 ceph_oid_printf(oid, "%s%s.%016llx", RBD_OBJECT_MAP_PREFIX,
1870                                 rbd_dev->spec->image_id, snap_id);
1871 }
1872
1873 static int rbd_object_map_lock(struct rbd_device *rbd_dev)
1874 {
1875         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1876         CEPH_DEFINE_OID_ONSTACK(oid);
1877         u8 lock_type;
1878         char *lock_tag;
1879         struct ceph_locker *lockers;
1880         u32 num_lockers;
1881         bool broke_lock = false;
1882         int ret;
1883
1884         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1885
1886 again:
1887         ret = ceph_cls_lock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1888                             CEPH_CLS_LOCK_EXCLUSIVE, "", "", "", 0);
1889         if (ret != -EBUSY || broke_lock) {
1890                 if (ret == -EEXIST)
1891                         ret = 0; /* already locked by myself */
1892                 if (ret)
1893                         rbd_warn(rbd_dev, "failed to lock object map: %d", ret);
1894                 return ret;
1895         }
1896
1897         ret = ceph_cls_lock_info(osdc, &oid, &rbd_dev->header_oloc,
1898                                  RBD_LOCK_NAME, &lock_type, &lock_tag,
1899                                  &lockers, &num_lockers);
1900         if (ret) {
1901                 if (ret == -ENOENT)
1902                         goto again;
1903
1904                 rbd_warn(rbd_dev, "failed to get object map lockers: %d", ret);
1905                 return ret;
1906         }
1907
1908         kfree(lock_tag);
1909         if (num_lockers == 0)
1910                 goto again;
1911
1912         rbd_warn(rbd_dev, "breaking object map lock owned by %s%llu",
1913                  ENTITY_NAME(lockers[0].id.name));
1914
1915         ret = ceph_cls_break_lock(osdc, &oid, &rbd_dev->header_oloc,
1916                                   RBD_LOCK_NAME, lockers[0].id.cookie,
1917                                   &lockers[0].id.name);
1918         ceph_free_lockers(lockers, num_lockers);
1919         if (ret) {
1920                 if (ret == -ENOENT)
1921                         goto again;
1922
1923                 rbd_warn(rbd_dev, "failed to break object map lock: %d", ret);
1924                 return ret;
1925         }
1926
1927         broke_lock = true;
1928         goto again;
1929 }
1930
1931 static void rbd_object_map_unlock(struct rbd_device *rbd_dev)
1932 {
1933         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1934         CEPH_DEFINE_OID_ONSTACK(oid);
1935         int ret;
1936
1937         rbd_object_map_name(rbd_dev, CEPH_NOSNAP, &oid);
1938
1939         ret = ceph_cls_unlock(osdc, &oid, &rbd_dev->header_oloc, RBD_LOCK_NAME,
1940                               "");
1941         if (ret && ret != -ENOENT)
1942                 rbd_warn(rbd_dev, "failed to unlock object map: %d", ret);
1943 }
1944
1945 static int decode_object_map_header(void **p, void *end, u64 *object_map_size)
1946 {
1947         u8 struct_v;
1948         u32 struct_len;
1949         u32 header_len;
1950         void *header_end;
1951         int ret;
1952
1953         ceph_decode_32_safe(p, end, header_len, e_inval);
1954         header_end = *p + header_len;
1955
1956         ret = ceph_start_decoding(p, end, 1, "BitVector header", &struct_v,
1957                                   &struct_len);
1958         if (ret)
1959                 return ret;
1960
1961         ceph_decode_64_safe(p, end, *object_map_size, e_inval);
1962
1963         *p = header_end;
1964         return 0;
1965
1966 e_inval:
1967         return -EINVAL;
1968 }
1969
1970 static int __rbd_object_map_load(struct rbd_device *rbd_dev)
1971 {
1972         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1973         CEPH_DEFINE_OID_ONSTACK(oid);
1974         struct page **pages;
1975         void *p, *end;
1976         size_t reply_len;
1977         u64 num_objects;
1978         u64 object_map_bytes;
1979         u64 object_map_size;
1980         int num_pages;
1981         int ret;
1982
1983         rbd_assert(!rbd_dev->object_map && !rbd_dev->object_map_size);
1984
1985         num_objects = ceph_get_num_objects(&rbd_dev->layout,
1986                                            rbd_dev->mapping.size);
1987         object_map_bytes = DIV_ROUND_UP_ULL(num_objects * BITS_PER_OBJ,
1988                                             BITS_PER_BYTE);
1989         num_pages = calc_pages_for(0, object_map_bytes) + 1;
1990         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1991         if (IS_ERR(pages))
1992                 return PTR_ERR(pages);
1993
1994         reply_len = num_pages * PAGE_SIZE;
1995         rbd_object_map_name(rbd_dev, rbd_dev->spec->snap_id, &oid);
1996         ret = ceph_osdc_call(osdc, &oid, &rbd_dev->header_oloc,
1997                              "rbd", "object_map_load", CEPH_OSD_FLAG_READ,
1998                              NULL, 0, pages, &reply_len);
1999         if (ret)
2000                 goto out;
2001
2002         p = page_address(pages[0]);
2003         end = p + min(reply_len, (size_t)PAGE_SIZE);
2004         ret = decode_object_map_header(&p, end, &object_map_size);
2005         if (ret)
2006                 goto out;
2007
2008         if (object_map_size != num_objects) {
2009                 rbd_warn(rbd_dev, "object map size mismatch: %llu vs %llu",
2010                          object_map_size, num_objects);
2011                 ret = -EINVAL;
2012                 goto out;
2013         }
2014
2015         if (offset_in_page(p) + object_map_bytes > reply_len) {
2016                 ret = -EINVAL;
2017                 goto out;
2018         }
2019
2020         rbd_dev->object_map = kvmalloc(object_map_bytes, GFP_KERNEL);
2021         if (!rbd_dev->object_map) {
2022                 ret = -ENOMEM;
2023                 goto out;
2024         }
2025
2026         rbd_dev->object_map_size = object_map_size;
2027         ceph_copy_from_page_vector(pages, rbd_dev->object_map,
2028                                    offset_in_page(p), object_map_bytes);
2029
2030 out:
2031         ceph_release_page_vector(pages, num_pages);
2032         return ret;
2033 }
2034
2035 static void rbd_object_map_free(struct rbd_device *rbd_dev)
2036 {
2037         kvfree(rbd_dev->object_map);
2038         rbd_dev->object_map = NULL;
2039         rbd_dev->object_map_size = 0;
2040 }
2041
2042 static int rbd_object_map_load(struct rbd_device *rbd_dev)
2043 {
2044         int ret;
2045
2046         ret = __rbd_object_map_load(rbd_dev);
2047         if (ret)
2048                 return ret;
2049
2050         ret = rbd_dev_v2_get_flags(rbd_dev);
2051         if (ret) {
2052                 rbd_object_map_free(rbd_dev);
2053                 return ret;
2054         }
2055
2056         if (rbd_dev->object_map_flags & RBD_FLAG_OBJECT_MAP_INVALID)
2057                 rbd_warn(rbd_dev, "object map is invalid");
2058
2059         return 0;
2060 }
2061
2062 static int rbd_object_map_open(struct rbd_device *rbd_dev)
2063 {
2064         int ret;
2065
2066         ret = rbd_object_map_lock(rbd_dev);
2067         if (ret)
2068                 return ret;
2069
2070         ret = rbd_object_map_load(rbd_dev);
2071         if (ret) {
2072                 rbd_object_map_unlock(rbd_dev);
2073                 return ret;
2074         }
2075
2076         return 0;
2077 }
2078
2079 static void rbd_object_map_close(struct rbd_device *rbd_dev)
2080 {
2081         rbd_object_map_free(rbd_dev);
2082         rbd_object_map_unlock(rbd_dev);
2083 }
2084
2085 /*
2086  * This function needs snap_id (or more precisely just something to
2087  * distinguish between HEAD and snapshot object maps), new_state and
2088  * current_state that were passed to rbd_object_map_update().
2089  *
2090  * To avoid allocating and stashing a context we piggyback on the OSD
2091  * request.  A HEAD update has two ops (assert_locked).  For new_state
2092  * and current_state we decode our own object_map_update op, encoded in
2093  * rbd_cls_object_map_update().
2094  */
2095 static int rbd_object_map_update_finish(struct rbd_obj_request *obj_req,
2096                                         struct ceph_osd_request *osd_req)
2097 {
2098         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2099         struct ceph_osd_data *osd_data;
2100         u64 objno;
2101         u8 state, new_state, uninitialized_var(current_state);
2102         bool has_current_state;
2103         void *p;
2104
2105         if (osd_req->r_result)
2106                 return osd_req->r_result;
2107
2108         /*
2109          * Nothing to do for a snapshot object map.
2110          */
2111         if (osd_req->r_num_ops == 1)
2112                 return 0;
2113
2114         /*
2115          * Update in-memory HEAD object map.
2116          */
2117         rbd_assert(osd_req->r_num_ops == 2);
2118         osd_data = osd_req_op_data(osd_req, 1, cls, request_data);
2119         rbd_assert(osd_data->type == CEPH_OSD_DATA_TYPE_PAGES);
2120
2121         p = page_address(osd_data->pages[0]);
2122         objno = ceph_decode_64(&p);
2123         rbd_assert(objno == obj_req->ex.oe_objno);
2124         rbd_assert(ceph_decode_64(&p) == objno + 1);
2125         new_state = ceph_decode_8(&p);
2126         has_current_state = ceph_decode_8(&p);
2127         if (has_current_state)
2128                 current_state = ceph_decode_8(&p);
2129
2130         spin_lock(&rbd_dev->object_map_lock);
2131         state = __rbd_object_map_get(rbd_dev, objno);
2132         if (!has_current_state || current_state == state ||
2133             (current_state == OBJECT_EXISTS && state == OBJECT_EXISTS_CLEAN))
2134                 __rbd_object_map_set(rbd_dev, objno, new_state);
2135         spin_unlock(&rbd_dev->object_map_lock);
2136
2137         return 0;
2138 }
2139
2140 static void rbd_object_map_callback(struct ceph_osd_request *osd_req)
2141 {
2142         struct rbd_obj_request *obj_req = osd_req->r_priv;
2143         int result;
2144
2145         dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
2146              osd_req->r_result, obj_req);
2147
2148         result = rbd_object_map_update_finish(obj_req, osd_req);
2149         rbd_obj_handle_request(obj_req, result);
2150 }
2151
2152 static bool update_needed(struct rbd_device *rbd_dev, u64 objno, u8 new_state)
2153 {
2154         u8 state = rbd_object_map_get(rbd_dev, objno);
2155
2156         if (state == new_state ||
2157             (new_state == OBJECT_PENDING && state == OBJECT_NONEXISTENT) ||
2158             (new_state == OBJECT_NONEXISTENT && state != OBJECT_PENDING))
2159                 return false;
2160
2161         return true;
2162 }
2163
2164 static int rbd_cls_object_map_update(struct ceph_osd_request *req,
2165                                      int which, u64 objno, u8 new_state,
2166                                      const u8 *current_state)
2167 {
2168         struct page **pages;
2169         void *p, *start;
2170         int ret;
2171
2172         ret = osd_req_op_cls_init(req, which, "rbd", "object_map_update");
2173         if (ret)
2174                 return ret;
2175
2176         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2177         if (IS_ERR(pages))
2178                 return PTR_ERR(pages);
2179
2180         p = start = page_address(pages[0]);
2181         ceph_encode_64(&p, objno);
2182         ceph_encode_64(&p, objno + 1);
2183         ceph_encode_8(&p, new_state);
2184         if (current_state) {
2185                 ceph_encode_8(&p, 1);
2186                 ceph_encode_8(&p, *current_state);
2187         } else {
2188                 ceph_encode_8(&p, 0);
2189         }
2190
2191         osd_req_op_cls_request_data_pages(req, which, pages, p - start, 0,
2192                                           false, true);
2193         return 0;
2194 }
2195
2196 /*
2197  * Return:
2198  *   0 - object map update sent
2199  *   1 - object map update isn't needed
2200  *  <0 - error
2201  */
2202 static int rbd_object_map_update(struct rbd_obj_request *obj_req, u64 snap_id,
2203                                  u8 new_state, const u8 *current_state)
2204 {
2205         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2206         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2207         struct ceph_osd_request *req;
2208         int num_ops = 1;
2209         int which = 0;
2210         int ret;
2211
2212         if (snap_id == CEPH_NOSNAP) {
2213                 if (!update_needed(rbd_dev, obj_req->ex.oe_objno, new_state))
2214                         return 1;
2215
2216                 num_ops++; /* assert_locked */
2217         }
2218
2219         req = ceph_osdc_alloc_request(osdc, NULL, num_ops, false, GFP_NOIO);
2220         if (!req)
2221                 return -ENOMEM;
2222
2223         list_add_tail(&req->r_private_item, &obj_req->osd_reqs);
2224         req->r_callback = rbd_object_map_callback;
2225         req->r_priv = obj_req;
2226
2227         rbd_object_map_name(rbd_dev, snap_id, &req->r_base_oid);
2228         ceph_oloc_copy(&req->r_base_oloc, &rbd_dev->header_oloc);
2229         req->r_flags = CEPH_OSD_FLAG_WRITE;
2230         ktime_get_real_ts64(&req->r_mtime);
2231
2232         if (snap_id == CEPH_NOSNAP) {
2233                 /*
2234                  * Protect against possible race conditions during lock
2235                  * ownership transitions.
2236                  */
2237                 ret = ceph_cls_assert_locked(req, which++, RBD_LOCK_NAME,
2238                                              CEPH_CLS_LOCK_EXCLUSIVE, "", "");
2239                 if (ret)
2240                         return ret;
2241         }
2242
2243         ret = rbd_cls_object_map_update(req, which, obj_req->ex.oe_objno,
2244                                         new_state, current_state);
2245         if (ret)
2246                 return ret;
2247
2248         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
2249         if (ret)
2250                 return ret;
2251
2252         ceph_osdc_start_request(osdc, req, false);
2253         return 0;
2254 }
2255
2256 static void prune_extents(struct ceph_file_extent *img_extents,
2257                           u32 *num_img_extents, u64 overlap)
2258 {
2259         u32 cnt = *num_img_extents;
2260
2261         /* drop extents completely beyond the overlap */
2262         while (cnt && img_extents[cnt - 1].fe_off >= overlap)
2263                 cnt--;
2264
2265         if (cnt) {
2266                 struct ceph_file_extent *ex = &img_extents[cnt - 1];
2267
2268                 /* trim final overlapping extent */
2269                 if (ex->fe_off + ex->fe_len > overlap)
2270                         ex->fe_len = overlap - ex->fe_off;
2271         }
2272
2273         *num_img_extents = cnt;
2274 }
2275
2276 /*
2277  * Determine the byte range(s) covered by either just the object extent
2278  * or the entire object in the parent image.
2279  */
2280 static int rbd_obj_calc_img_extents(struct rbd_obj_request *obj_req,
2281                                     bool entire)
2282 {
2283         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2284         int ret;
2285
2286         if (!rbd_dev->parent_overlap)
2287                 return 0;
2288
2289         ret = ceph_extent_to_file(&rbd_dev->layout, obj_req->ex.oe_objno,
2290                                   entire ? 0 : obj_req->ex.oe_off,
2291                                   entire ? rbd_dev->layout.object_size :
2292                                                         obj_req->ex.oe_len,
2293                                   &obj_req->img_extents,
2294                                   &obj_req->num_img_extents);
2295         if (ret)
2296                 return ret;
2297
2298         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
2299                       rbd_dev->parent_overlap);
2300         return 0;
2301 }
2302
2303 static void rbd_osd_setup_data(struct ceph_osd_request *osd_req, int which)
2304 {
2305         struct rbd_obj_request *obj_req = osd_req->r_priv;
2306
2307         switch (obj_req->img_request->data_type) {
2308         case OBJ_REQUEST_BIO:
2309                 osd_req_op_extent_osd_data_bio(osd_req, which,
2310                                                &obj_req->bio_pos,
2311                                                obj_req->ex.oe_len);
2312                 break;
2313         case OBJ_REQUEST_BVECS:
2314         case OBJ_REQUEST_OWN_BVECS:
2315                 rbd_assert(obj_req->bvec_pos.iter.bi_size ==
2316                                                         obj_req->ex.oe_len);
2317                 rbd_assert(obj_req->bvec_idx == obj_req->bvec_count);
2318                 osd_req_op_extent_osd_data_bvec_pos(osd_req, which,
2319                                                     &obj_req->bvec_pos);
2320                 break;
2321         default:
2322                 BUG();
2323         }
2324 }
2325
2326 static int rbd_osd_setup_stat(struct ceph_osd_request *osd_req, int which)
2327 {
2328         struct page **pages;
2329
2330         /*
2331          * The response data for a STAT call consists of:
2332          *     le64 length;
2333          *     struct {
2334          *         le32 tv_sec;
2335          *         le32 tv_nsec;
2336          *     } mtime;
2337          */
2338         pages = ceph_alloc_page_vector(1, GFP_NOIO);
2339         if (IS_ERR(pages))
2340                 return PTR_ERR(pages);
2341
2342         osd_req_op_init(osd_req, which, CEPH_OSD_OP_STAT, 0);
2343         osd_req_op_raw_data_in_pages(osd_req, which, pages,
2344                                      8 + sizeof(struct ceph_timespec),
2345                                      0, false, true);
2346         return 0;
2347 }
2348
2349 static int rbd_osd_setup_copyup(struct ceph_osd_request *osd_req, int which,
2350                                 u32 bytes)
2351 {
2352         struct rbd_obj_request *obj_req = osd_req->r_priv;
2353         int ret;
2354
2355         ret = osd_req_op_cls_init(osd_req, which, "rbd", "copyup");
2356         if (ret)
2357                 return ret;
2358
2359         osd_req_op_cls_request_data_bvecs(osd_req, which, obj_req->copyup_bvecs,
2360                                           obj_req->copyup_bvec_count, bytes);
2361         return 0;
2362 }
2363
2364 static int rbd_obj_init_read(struct rbd_obj_request *obj_req)
2365 {
2366         obj_req->read_state = RBD_OBJ_READ_START;
2367         return 0;
2368 }
2369
2370 static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2371                                       int which)
2372 {
2373         struct rbd_obj_request *obj_req = osd_req->r_priv;
2374         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2375         u16 opcode;
2376
2377         if (!use_object_map(rbd_dev) ||
2378             !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
2379                 osd_req_op_alloc_hint_init(osd_req, which++,
2380                                            rbd_dev->layout.object_size,
2381                                            rbd_dev->layout.object_size);
2382         }
2383
2384         if (rbd_obj_is_entire(obj_req))
2385                 opcode = CEPH_OSD_OP_WRITEFULL;
2386         else
2387                 opcode = CEPH_OSD_OP_WRITE;
2388
2389         osd_req_op_extent_init(osd_req, which, opcode,
2390                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2391         rbd_osd_setup_data(osd_req, which);
2392 }
2393
2394 static int rbd_obj_init_write(struct rbd_obj_request *obj_req)
2395 {
2396         int ret;
2397
2398         /* reverse map the entire object onto the parent */
2399         ret = rbd_obj_calc_img_extents(obj_req, true);
2400         if (ret)
2401                 return ret;
2402
2403         if (rbd_obj_copyup_enabled(obj_req))
2404                 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2405
2406         obj_req->write_state = RBD_OBJ_WRITE_START;
2407         return 0;
2408 }
2409
2410 static u16 truncate_or_zero_opcode(struct rbd_obj_request *obj_req)
2411 {
2412         return rbd_obj_is_tail(obj_req) ? CEPH_OSD_OP_TRUNCATE :
2413                                           CEPH_OSD_OP_ZERO;
2414 }
2415
2416 static void __rbd_osd_setup_discard_ops(struct ceph_osd_request *osd_req,
2417                                         int which)
2418 {
2419         struct rbd_obj_request *obj_req = osd_req->r_priv;
2420
2421         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents) {
2422                 rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2423                 osd_req_op_init(osd_req, which, CEPH_OSD_OP_DELETE, 0);
2424         } else {
2425                 osd_req_op_extent_init(osd_req, which,
2426                                        truncate_or_zero_opcode(obj_req),
2427                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2428                                        0, 0);
2429         }
2430 }
2431
2432 static int rbd_obj_init_discard(struct rbd_obj_request *obj_req)
2433 {
2434         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2435         u64 off, next_off;
2436         int ret;
2437
2438         /*
2439          * Align the range to alloc_size boundary and punt on discards
2440          * that are too small to free up any space.
2441          *
2442          * alloc_size == object_size && is_tail() is a special case for
2443          * filestore with filestore_punch_hole = false, needed to allow
2444          * truncate (in addition to delete).
2445          */
2446         if (rbd_dev->opts->alloc_size != rbd_dev->layout.object_size ||
2447             !rbd_obj_is_tail(obj_req)) {
2448                 off = round_up(obj_req->ex.oe_off, rbd_dev->opts->alloc_size);
2449                 next_off = round_down(obj_req->ex.oe_off + obj_req->ex.oe_len,
2450                                       rbd_dev->opts->alloc_size);
2451                 if (off >= next_off)
2452                         return 1;
2453
2454                 dout("%s %p %llu~%llu -> %llu~%llu\n", __func__,
2455                      obj_req, obj_req->ex.oe_off, obj_req->ex.oe_len,
2456                      off, next_off - off);
2457                 obj_req->ex.oe_off = off;
2458                 obj_req->ex.oe_len = next_off - off;
2459         }
2460
2461         /* reverse map the entire object onto the parent */
2462         ret = rbd_obj_calc_img_extents(obj_req, true);
2463         if (ret)
2464                 return ret;
2465
2466         obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2467         if (rbd_obj_is_entire(obj_req) && !obj_req->num_img_extents)
2468                 obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2469
2470         obj_req->write_state = RBD_OBJ_WRITE_START;
2471         return 0;
2472 }
2473
2474 static void __rbd_osd_setup_zeroout_ops(struct ceph_osd_request *osd_req,
2475                                         int which)
2476 {
2477         struct rbd_obj_request *obj_req = osd_req->r_priv;
2478         u16 opcode;
2479
2480         if (rbd_obj_is_entire(obj_req)) {
2481                 if (obj_req->num_img_extents) {
2482                         if (!(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2483                                 osd_req_op_init(osd_req, which++,
2484                                                 CEPH_OSD_OP_CREATE, 0);
2485                         opcode = CEPH_OSD_OP_TRUNCATE;
2486                 } else {
2487                         rbd_assert(obj_req->flags & RBD_OBJ_FLAG_DELETION);
2488                         osd_req_op_init(osd_req, which++,
2489                                         CEPH_OSD_OP_DELETE, 0);
2490                         opcode = 0;
2491                 }
2492         } else {
2493                 opcode = truncate_or_zero_opcode(obj_req);
2494         }
2495
2496         if (opcode)
2497                 osd_req_op_extent_init(osd_req, which, opcode,
2498                                        obj_req->ex.oe_off, obj_req->ex.oe_len,
2499                                        0, 0);
2500 }
2501
2502 static int rbd_obj_init_zeroout(struct rbd_obj_request *obj_req)
2503 {
2504         int ret;
2505
2506         /* reverse map the entire object onto the parent */
2507         ret = rbd_obj_calc_img_extents(obj_req, true);
2508         if (ret)
2509                 return ret;
2510
2511         if (rbd_obj_copyup_enabled(obj_req))
2512                 obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ENABLED;
2513         if (!obj_req->num_img_extents) {
2514                 obj_req->flags |= RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT;
2515                 if (rbd_obj_is_entire(obj_req))
2516                         obj_req->flags |= RBD_OBJ_FLAG_DELETION;
2517         }
2518
2519         obj_req->write_state = RBD_OBJ_WRITE_START;
2520         return 0;
2521 }
2522
2523 static int count_write_ops(struct rbd_obj_request *obj_req)
2524 {
2525         struct rbd_img_request *img_req = obj_req->img_request;
2526
2527         switch (img_req->op_type) {
2528         case OBJ_OP_WRITE:
2529                 if (!use_object_map(img_req->rbd_dev) ||
2530                     !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST))
2531                         return 2; /* setallochint + write/writefull */
2532
2533                 return 1; /* write/writefull */
2534         case OBJ_OP_DISCARD:
2535                 return 1; /* delete/truncate/zero */
2536         case OBJ_OP_ZEROOUT:
2537                 if (rbd_obj_is_entire(obj_req) && obj_req->num_img_extents &&
2538                     !(obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED))
2539                         return 2; /* create + truncate */
2540
2541                 return 1; /* delete/truncate/zero */
2542         default:
2543                 BUG();
2544         }
2545 }
2546
2547 static void rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
2548                                     int which)
2549 {
2550         struct rbd_obj_request *obj_req = osd_req->r_priv;
2551
2552         switch (obj_req->img_request->op_type) {
2553         case OBJ_OP_WRITE:
2554                 __rbd_osd_setup_write_ops(osd_req, which);
2555                 break;
2556         case OBJ_OP_DISCARD:
2557                 __rbd_osd_setup_discard_ops(osd_req, which);
2558                 break;
2559         case OBJ_OP_ZEROOUT:
2560                 __rbd_osd_setup_zeroout_ops(osd_req, which);
2561                 break;
2562         default:
2563                 BUG();
2564         }
2565 }
2566
2567 /*
2568  * Prune the list of object requests (adjust offset and/or length, drop
2569  * redundant requests).  Prepare object request state machines and image
2570  * request state machine for execution.
2571  */
2572 static int __rbd_img_fill_request(struct rbd_img_request *img_req)
2573 {
2574         struct rbd_obj_request *obj_req, *next_obj_req;
2575         int ret;
2576
2577         for_each_obj_request_safe(img_req, obj_req, next_obj_req) {
2578                 switch (img_req->op_type) {
2579                 case OBJ_OP_READ:
2580                         ret = rbd_obj_init_read(obj_req);
2581                         break;
2582                 case OBJ_OP_WRITE:
2583                         ret = rbd_obj_init_write(obj_req);
2584                         break;
2585                 case OBJ_OP_DISCARD:
2586                         ret = rbd_obj_init_discard(obj_req);
2587                         break;
2588                 case OBJ_OP_ZEROOUT:
2589                         ret = rbd_obj_init_zeroout(obj_req);
2590                         break;
2591                 default:
2592                         BUG();
2593                 }
2594                 if (ret < 0)
2595                         return ret;
2596                 if (ret > 0) {
2597                         rbd_img_obj_request_del(img_req, obj_req);
2598                         continue;
2599                 }
2600         }
2601
2602         img_req->state = RBD_IMG_START;
2603         return 0;
2604 }
2605
2606 union rbd_img_fill_iter {
2607         struct ceph_bio_iter    bio_iter;
2608         struct ceph_bvec_iter   bvec_iter;
2609 };
2610
2611 struct rbd_img_fill_ctx {
2612         enum obj_request_type   pos_type;
2613         union rbd_img_fill_iter *pos;
2614         union rbd_img_fill_iter iter;
2615         ceph_object_extent_fn_t set_pos_fn;
2616         ceph_object_extent_fn_t count_fn;
2617         ceph_object_extent_fn_t copy_fn;
2618 };
2619
2620 static struct ceph_object_extent *alloc_object_extent(void *arg)
2621 {
2622         struct rbd_img_request *img_req = arg;
2623         struct rbd_obj_request *obj_req;
2624
2625         obj_req = rbd_obj_request_create();
2626         if (!obj_req)
2627                 return NULL;
2628
2629         rbd_img_obj_request_add(img_req, obj_req);
2630         return &obj_req->ex;
2631 }
2632
2633 /*
2634  * While su != os && sc == 1 is technically not fancy (it's the same
2635  * layout as su == os && sc == 1), we can't use the nocopy path for it
2636  * because ->set_pos_fn() should be called only once per object.
2637  * ceph_file_to_extents() invokes action_fn once per stripe unit, so
2638  * treat su != os && sc == 1 as fancy.
2639  */
2640 static bool rbd_layout_is_fancy(struct ceph_file_layout *l)
2641 {
2642         return l->stripe_unit != l->object_size;
2643 }
2644
2645 static int rbd_img_fill_request_nocopy(struct rbd_img_request *img_req,
2646                                        struct ceph_file_extent *img_extents,
2647                                        u32 num_img_extents,
2648                                        struct rbd_img_fill_ctx *fctx)
2649 {
2650         u32 i;
2651         int ret;
2652
2653         img_req->data_type = fctx->pos_type;
2654
2655         /*
2656          * Create object requests and set each object request's starting
2657          * position in the provided bio (list) or bio_vec array.
2658          */
2659         fctx->iter = *fctx->pos;
2660         for (i = 0; i < num_img_extents; i++) {
2661                 ret = ceph_file_to_extents(&img_req->rbd_dev->layout,
2662                                            img_extents[i].fe_off,
2663                                            img_extents[i].fe_len,
2664                                            &img_req->object_extents,
2665                                            alloc_object_extent, img_req,
2666                                            fctx->set_pos_fn, &fctx->iter);
2667                 if (ret)
2668                         return ret;
2669         }
2670
2671         return __rbd_img_fill_request(img_req);
2672 }
2673
2674 /*
2675  * Map a list of image extents to a list of object extents, create the
2676  * corresponding object requests (normally each to a different object,
2677  * but not always) and add them to @img_req.  For each object request,
2678  * set up its data descriptor to point to the corresponding chunk(s) of
2679  * @fctx->pos data buffer.
2680  *
2681  * Because ceph_file_to_extents() will merge adjacent object extents
2682  * together, each object request's data descriptor may point to multiple
2683  * different chunks of @fctx->pos data buffer.
2684  *
2685  * @fctx->pos data buffer is assumed to be large enough.
2686  */
2687 static int rbd_img_fill_request(struct rbd_img_request *img_req,
2688                                 struct ceph_file_extent *img_extents,
2689                                 u32 num_img_extents,
2690                                 struct rbd_img_fill_ctx *fctx)
2691 {
2692         struct rbd_device *rbd_dev = img_req->rbd_dev;
2693         struct rbd_obj_request *obj_req;
2694         u32 i;
2695         int ret;
2696
2697         if (fctx->pos_type == OBJ_REQUEST_NODATA ||
2698             !rbd_layout_is_fancy(&rbd_dev->layout))
2699                 return rbd_img_fill_request_nocopy(img_req, img_extents,
2700                                                    num_img_extents, fctx);
2701
2702         img_req->data_type = OBJ_REQUEST_OWN_BVECS;
2703
2704         /*
2705          * Create object requests and determine ->bvec_count for each object
2706          * request.  Note that ->bvec_count sum over all object requests may
2707          * be greater than the number of bio_vecs in the provided bio (list)
2708          * or bio_vec array because when mapped, those bio_vecs can straddle
2709          * stripe unit boundaries.
2710          */
2711         fctx->iter = *fctx->pos;
2712         for (i = 0; i < num_img_extents; i++) {
2713                 ret = ceph_file_to_extents(&rbd_dev->layout,
2714                                            img_extents[i].fe_off,
2715                                            img_extents[i].fe_len,
2716                                            &img_req->object_extents,
2717                                            alloc_object_extent, img_req,
2718                                            fctx->count_fn, &fctx->iter);
2719                 if (ret)
2720                         return ret;
2721         }
2722
2723         for_each_obj_request(img_req, obj_req) {
2724                 obj_req->bvec_pos.bvecs = kmalloc_array(obj_req->bvec_count,
2725                                               sizeof(*obj_req->bvec_pos.bvecs),
2726                                               GFP_NOIO);
2727                 if (!obj_req->bvec_pos.bvecs)
2728                         return -ENOMEM;
2729         }
2730
2731         /*
2732          * Fill in each object request's private bio_vec array, splitting and
2733          * rearranging the provided bio_vecs in stripe unit chunks as needed.
2734          */
2735         fctx->iter = *fctx->pos;
2736         for (i = 0; i < num_img_extents; i++) {
2737                 ret = ceph_iterate_extents(&rbd_dev->layout,
2738                                            img_extents[i].fe_off,
2739                                            img_extents[i].fe_len,
2740                                            &img_req->object_extents,
2741                                            fctx->copy_fn, &fctx->iter);
2742                 if (ret)
2743                         return ret;
2744         }
2745
2746         return __rbd_img_fill_request(img_req);
2747 }
2748
2749 static int rbd_img_fill_nodata(struct rbd_img_request *img_req,
2750                                u64 off, u64 len)
2751 {
2752         struct ceph_file_extent ex = { off, len };
2753         union rbd_img_fill_iter dummy;
2754         struct rbd_img_fill_ctx fctx = {
2755                 .pos_type = OBJ_REQUEST_NODATA,
2756                 .pos = &dummy,
2757         };
2758
2759         return rbd_img_fill_request(img_req, &ex, 1, &fctx);
2760 }
2761
2762 static void set_bio_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2763 {
2764         struct rbd_obj_request *obj_req =
2765             container_of(ex, struct rbd_obj_request, ex);
2766         struct ceph_bio_iter *it = arg;
2767
2768         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2769         obj_req->bio_pos = *it;
2770         ceph_bio_iter_advance(it, bytes);
2771 }
2772
2773 static void count_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2774 {
2775         struct rbd_obj_request *obj_req =
2776             container_of(ex, struct rbd_obj_request, ex);
2777         struct ceph_bio_iter *it = arg;
2778
2779         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2780         ceph_bio_iter_advance_step(it, bytes, ({
2781                 obj_req->bvec_count++;
2782         }));
2783
2784 }
2785
2786 static void copy_bio_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2787 {
2788         struct rbd_obj_request *obj_req =
2789             container_of(ex, struct rbd_obj_request, ex);
2790         struct ceph_bio_iter *it = arg;
2791
2792         dout("%s objno %llu bytes %u\n", __func__, ex->oe_objno, bytes);
2793         ceph_bio_iter_advance_step(it, bytes, ({
2794                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2795                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2796         }));
2797 }
2798
2799 static int __rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2800                                    struct ceph_file_extent *img_extents,
2801                                    u32 num_img_extents,
2802                                    struct ceph_bio_iter *bio_pos)
2803 {
2804         struct rbd_img_fill_ctx fctx = {
2805                 .pos_type = OBJ_REQUEST_BIO,
2806                 .pos = (union rbd_img_fill_iter *)bio_pos,
2807                 .set_pos_fn = set_bio_pos,
2808                 .count_fn = count_bio_bvecs,
2809                 .copy_fn = copy_bio_bvecs,
2810         };
2811
2812         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2813                                     &fctx);
2814 }
2815
2816 static int rbd_img_fill_from_bio(struct rbd_img_request *img_req,
2817                                  u64 off, u64 len, struct bio *bio)
2818 {
2819         struct ceph_file_extent ex = { off, len };
2820         struct ceph_bio_iter it = { .bio = bio, .iter = bio->bi_iter };
2821
2822         return __rbd_img_fill_from_bio(img_req, &ex, 1, &it);
2823 }
2824
2825 static void set_bvec_pos(struct ceph_object_extent *ex, u32 bytes, void *arg)
2826 {
2827         struct rbd_obj_request *obj_req =
2828             container_of(ex, struct rbd_obj_request, ex);
2829         struct ceph_bvec_iter *it = arg;
2830
2831         obj_req->bvec_pos = *it;
2832         ceph_bvec_iter_shorten(&obj_req->bvec_pos, bytes);
2833         ceph_bvec_iter_advance(it, bytes);
2834 }
2835
2836 static void count_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2837 {
2838         struct rbd_obj_request *obj_req =
2839             container_of(ex, struct rbd_obj_request, ex);
2840         struct ceph_bvec_iter *it = arg;
2841
2842         ceph_bvec_iter_advance_step(it, bytes, ({
2843                 obj_req->bvec_count++;
2844         }));
2845 }
2846
2847 static void copy_bvecs(struct ceph_object_extent *ex, u32 bytes, void *arg)
2848 {
2849         struct rbd_obj_request *obj_req =
2850             container_of(ex, struct rbd_obj_request, ex);
2851         struct ceph_bvec_iter *it = arg;
2852
2853         ceph_bvec_iter_advance_step(it, bytes, ({
2854                 obj_req->bvec_pos.bvecs[obj_req->bvec_idx++] = bv;
2855                 obj_req->bvec_pos.iter.bi_size += bv.bv_len;
2856         }));
2857 }
2858
2859 static int __rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2860                                      struct ceph_file_extent *img_extents,
2861                                      u32 num_img_extents,
2862                                      struct ceph_bvec_iter *bvec_pos)
2863 {
2864         struct rbd_img_fill_ctx fctx = {
2865                 .pos_type = OBJ_REQUEST_BVECS,
2866                 .pos = (union rbd_img_fill_iter *)bvec_pos,
2867                 .set_pos_fn = set_bvec_pos,
2868                 .count_fn = count_bvecs,
2869                 .copy_fn = copy_bvecs,
2870         };
2871
2872         return rbd_img_fill_request(img_req, img_extents, num_img_extents,
2873                                     &fctx);
2874 }
2875
2876 static int rbd_img_fill_from_bvecs(struct rbd_img_request *img_req,
2877                                    struct ceph_file_extent *img_extents,
2878                                    u32 num_img_extents,
2879                                    struct bio_vec *bvecs)
2880 {
2881         struct ceph_bvec_iter it = {
2882                 .bvecs = bvecs,
2883                 .iter = { .bi_size = ceph_file_extents_bytes(img_extents,
2884                                                              num_img_extents) },
2885         };
2886
2887         return __rbd_img_fill_from_bvecs(img_req, img_extents, num_img_extents,
2888                                          &it);
2889 }
2890
2891 static void rbd_img_handle_request_work(struct work_struct *work)
2892 {
2893         struct rbd_img_request *img_req =
2894             container_of(work, struct rbd_img_request, work);
2895
2896         rbd_img_handle_request(img_req, img_req->work_result);
2897 }
2898
2899 static void rbd_img_schedule(struct rbd_img_request *img_req, int result)
2900 {
2901         INIT_WORK(&img_req->work, rbd_img_handle_request_work);
2902         img_req->work_result = result;
2903         queue_work(rbd_wq, &img_req->work);
2904 }
2905
2906 static bool rbd_obj_may_exist(struct rbd_obj_request *obj_req)
2907 {
2908         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2909
2910         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno)) {
2911                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
2912                 return true;
2913         }
2914
2915         dout("%s %p objno %llu assuming dne\n", __func__, obj_req,
2916              obj_req->ex.oe_objno);
2917         return false;
2918 }
2919
2920 static int rbd_obj_read_object(struct rbd_obj_request *obj_req)
2921 {
2922         struct ceph_osd_request *osd_req;
2923         int ret;
2924
2925         osd_req = __rbd_obj_add_osd_request(obj_req, NULL, 1);
2926         if (IS_ERR(osd_req))
2927                 return PTR_ERR(osd_req);
2928
2929         osd_req_op_extent_init(osd_req, 0, CEPH_OSD_OP_READ,
2930                                obj_req->ex.oe_off, obj_req->ex.oe_len, 0, 0);
2931         rbd_osd_setup_data(osd_req, 0);
2932         rbd_osd_format_read(osd_req);
2933
2934         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
2935         if (ret)
2936                 return ret;
2937
2938         rbd_osd_submit(osd_req);
2939         return 0;
2940 }
2941
2942 static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req)
2943 {
2944         struct rbd_img_request *img_req = obj_req->img_request;
2945         struct rbd_img_request *child_img_req;
2946         int ret;
2947
2948         child_img_req = rbd_img_request_create(img_req->rbd_dev->parent,
2949                                                OBJ_OP_READ, NULL);
2950         if (!child_img_req)
2951                 return -ENOMEM;
2952
2953         __set_bit(IMG_REQ_CHILD, &child_img_req->flags);
2954         child_img_req->obj_request = obj_req;
2955
2956         dout("%s child_img_req %p for obj_req %p\n", __func__, child_img_req,
2957              obj_req);
2958
2959         if (!rbd_img_is_write(img_req)) {
2960                 switch (img_req->data_type) {
2961                 case OBJ_REQUEST_BIO:
2962                         ret = __rbd_img_fill_from_bio(child_img_req,
2963                                                       obj_req->img_extents,
2964                                                       obj_req->num_img_extents,
2965                                                       &obj_req->bio_pos);
2966                         break;
2967                 case OBJ_REQUEST_BVECS:
2968                 case OBJ_REQUEST_OWN_BVECS:
2969                         ret = __rbd_img_fill_from_bvecs(child_img_req,
2970                                                       obj_req->img_extents,
2971                                                       obj_req->num_img_extents,
2972                                                       &obj_req->bvec_pos);
2973                         break;
2974                 default:
2975                         BUG();
2976                 }
2977         } else {
2978                 ret = rbd_img_fill_from_bvecs(child_img_req,
2979                                               obj_req->img_extents,
2980                                               obj_req->num_img_extents,
2981                                               obj_req->copyup_bvecs);
2982         }
2983         if (ret) {
2984                 rbd_img_request_put(child_img_req);
2985                 return ret;
2986         }
2987
2988         /* avoid parent chain recursion */
2989         rbd_img_schedule(child_img_req, 0);
2990         return 0;
2991 }
2992
2993 static bool rbd_obj_advance_read(struct rbd_obj_request *obj_req, int *result)
2994 {
2995         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
2996         int ret;
2997
2998 again:
2999         switch (obj_req->read_state) {
3000         case RBD_OBJ_READ_START:
3001                 rbd_assert(!*result);
3002
3003                 if (!rbd_obj_may_exist(obj_req)) {
3004                         *result = -ENOENT;
3005                         obj_req->read_state = RBD_OBJ_READ_OBJECT;
3006                         goto again;
3007                 }
3008
3009                 ret = rbd_obj_read_object(obj_req);
3010                 if (ret) {
3011                         *result = ret;
3012                         return true;
3013                 }
3014                 obj_req->read_state = RBD_OBJ_READ_OBJECT;
3015                 return false;
3016         case RBD_OBJ_READ_OBJECT:
3017                 if (*result == -ENOENT && rbd_dev->parent_overlap) {
3018                         /* reverse map this object extent onto the parent */
3019                         ret = rbd_obj_calc_img_extents(obj_req, false);
3020                         if (ret) {
3021                                 *result = ret;
3022                                 return true;
3023                         }
3024                         if (obj_req->num_img_extents) {
3025                                 ret = rbd_obj_read_from_parent(obj_req);
3026                                 if (ret) {
3027                                         *result = ret;
3028                                         return true;
3029                                 }
3030                                 obj_req->read_state = RBD_OBJ_READ_PARENT;
3031                                 return false;
3032                         }
3033                 }
3034
3035                 /*
3036                  * -ENOENT means a hole in the image -- zero-fill the entire
3037                  * length of the request.  A short read also implies zero-fill
3038                  * to the end of the request.
3039                  */
3040                 if (*result == -ENOENT) {
3041                         rbd_obj_zero_range(obj_req, 0, obj_req->ex.oe_len);
3042                         *result = 0;
3043                 } else if (*result >= 0) {
3044                         if (*result < obj_req->ex.oe_len)
3045                                 rbd_obj_zero_range(obj_req, *result,
3046                                                 obj_req->ex.oe_len - *result);
3047                         else
3048                                 rbd_assert(*result == obj_req->ex.oe_len);
3049                         *result = 0;
3050                 }
3051                 return true;
3052         case RBD_OBJ_READ_PARENT:
3053                 /*
3054                  * The parent image is read only up to the overlap -- zero-fill
3055                  * from the overlap to the end of the request.
3056                  */
3057                 if (!*result) {
3058                         u32 obj_overlap = rbd_obj_img_extents_bytes(obj_req);
3059
3060                         if (obj_overlap < obj_req->ex.oe_len)
3061                                 rbd_obj_zero_range(obj_req, obj_overlap,
3062                                             obj_req->ex.oe_len - obj_overlap);
3063                 }
3064                 return true;
3065         default:
3066                 BUG();
3067         }
3068 }
3069
3070 static bool rbd_obj_write_is_noop(struct rbd_obj_request *obj_req)
3071 {
3072         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3073
3074         if (rbd_object_map_may_exist(rbd_dev, obj_req->ex.oe_objno))
3075                 obj_req->flags |= RBD_OBJ_FLAG_MAY_EXIST;
3076
3077         if (!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST) &&
3078             (obj_req->flags & RBD_OBJ_FLAG_NOOP_FOR_NONEXISTENT)) {
3079                 dout("%s %p noop for nonexistent\n", __func__, obj_req);
3080                 return true;
3081         }
3082
3083         return false;
3084 }
3085
3086 /*
3087  * Return:
3088  *   0 - object map update sent
3089  *   1 - object map update isn't needed
3090  *  <0 - error
3091  */
3092 static int rbd_obj_write_pre_object_map(struct rbd_obj_request *obj_req)
3093 {
3094         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3095         u8 new_state;
3096
3097         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3098                 return 1;
3099
3100         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3101                 new_state = OBJECT_PENDING;
3102         else
3103                 new_state = OBJECT_EXISTS;
3104
3105         return rbd_object_map_update(obj_req, CEPH_NOSNAP, new_state, NULL);
3106 }
3107
3108 static int rbd_obj_write_object(struct rbd_obj_request *obj_req)
3109 {
3110         struct ceph_osd_request *osd_req;
3111         int num_ops = count_write_ops(obj_req);
3112         int which = 0;
3113         int ret;
3114
3115         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED)
3116                 num_ops++; /* stat */
3117
3118         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3119         if (IS_ERR(osd_req))
3120                 return PTR_ERR(osd_req);
3121
3122         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3123                 ret = rbd_osd_setup_stat(osd_req, which++);
3124                 if (ret)
3125                         return ret;
3126         }
3127
3128         rbd_osd_setup_write_ops(osd_req, which);
3129         rbd_osd_format_write(osd_req);
3130
3131         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3132         if (ret)
3133                 return ret;
3134
3135         rbd_osd_submit(osd_req);
3136         return 0;
3137 }
3138
3139 /*
3140  * copyup_bvecs pages are never highmem pages
3141  */
3142 static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
3143 {
3144         struct ceph_bvec_iter it = {
3145                 .bvecs = bvecs,
3146                 .iter = { .bi_size = bytes },
3147         };
3148
3149         ceph_bvec_iter_advance_step(&it, bytes, ({
3150                 if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
3151                                bv.bv_len))
3152                         return false;
3153         }));
3154         return true;
3155 }
3156
3157 #define MODS_ONLY       U32_MAX
3158
3159 static int rbd_obj_copyup_empty_snapc(struct rbd_obj_request *obj_req,
3160                                       u32 bytes)
3161 {
3162         struct ceph_osd_request *osd_req;
3163         int ret;
3164
3165         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3166         rbd_assert(bytes > 0 && bytes != MODS_ONLY);
3167
3168         osd_req = __rbd_obj_add_osd_request(obj_req, &rbd_empty_snapc, 1);
3169         if (IS_ERR(osd_req))
3170                 return PTR_ERR(osd_req);
3171
3172         ret = rbd_osd_setup_copyup(osd_req, 0, bytes);
3173         if (ret)
3174                 return ret;
3175
3176         rbd_osd_format_write(osd_req);
3177
3178         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3179         if (ret)
3180                 return ret;
3181
3182         rbd_osd_submit(osd_req);
3183         return 0;
3184 }
3185
3186 static int rbd_obj_copyup_current_snapc(struct rbd_obj_request *obj_req,
3187                                         u32 bytes)
3188 {
3189         struct ceph_osd_request *osd_req;
3190         int num_ops = count_write_ops(obj_req);
3191         int which = 0;
3192         int ret;
3193
3194         dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
3195
3196         if (bytes != MODS_ONLY)
3197                 num_ops++; /* copyup */
3198
3199         osd_req = rbd_obj_add_osd_request(obj_req, num_ops);
3200         if (IS_ERR(osd_req))
3201                 return PTR_ERR(osd_req);
3202
3203         if (bytes != MODS_ONLY) {
3204                 ret = rbd_osd_setup_copyup(osd_req, which++, bytes);
3205                 if (ret)
3206                         return ret;
3207         }
3208
3209         rbd_osd_setup_write_ops(osd_req, which);
3210         rbd_osd_format_write(osd_req);
3211
3212         ret = ceph_osdc_alloc_messages(osd_req, GFP_NOIO);
3213         if (ret)
3214                 return ret;
3215
3216         rbd_osd_submit(osd_req);
3217         return 0;
3218 }
3219
3220 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
3221 {
3222         u32 i;
3223
3224         rbd_assert(!obj_req->copyup_bvecs);
3225         obj_req->copyup_bvec_count = calc_pages_for(0, obj_overlap);
3226         obj_req->copyup_bvecs = kcalloc(obj_req->copyup_bvec_count,
3227                                         sizeof(*obj_req->copyup_bvecs),
3228                                         GFP_NOIO);
3229         if (!obj_req->copyup_bvecs)
3230                 return -ENOMEM;
3231
3232         for (i = 0; i < obj_req->copyup_bvec_count; i++) {
3233                 unsigned int len = min(obj_overlap, (u64)PAGE_SIZE);
3234
3235                 obj_req->copyup_bvecs[i].bv_page = alloc_page(GFP_NOIO);
3236                 if (!obj_req->copyup_bvecs[i].bv_page)
3237                         return -ENOMEM;
3238
3239                 obj_req->copyup_bvecs[i].bv_offset = 0;
3240                 obj_req->copyup_bvecs[i].bv_len = len;
3241                 obj_overlap -= len;
3242         }
3243
3244         rbd_assert(!obj_overlap);
3245         return 0;
3246 }
3247
3248 /*
3249  * The target object doesn't exist.  Read the data for the entire
3250  * target object up to the overlap point (if any) from the parent,
3251  * so we can use it for a copyup.
3252  */
3253 static int rbd_obj_copyup_read_parent(struct rbd_obj_request *obj_req)
3254 {
3255         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3256         int ret;
3257
3258         rbd_assert(obj_req->num_img_extents);
3259         prune_extents(obj_req->img_extents, &obj_req->num_img_extents,
3260                       rbd_dev->parent_overlap);
3261         if (!obj_req->num_img_extents) {
3262                 /*
3263                  * The overlap has become 0 (most likely because the
3264                  * image has been flattened).  Re-submit the original write
3265                  * request -- pass MODS_ONLY since the copyup isn't needed
3266                  * anymore.
3267                  */
3268                 return rbd_obj_copyup_current_snapc(obj_req, MODS_ONLY);
3269         }
3270
3271         ret = setup_copyup_bvecs(obj_req, rbd_obj_img_extents_bytes(obj_req));
3272         if (ret)
3273                 return ret;
3274
3275         return rbd_obj_read_from_parent(obj_req);
3276 }
3277
3278 static void rbd_obj_copyup_object_maps(struct rbd_obj_request *obj_req)
3279 {
3280         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3281         struct ceph_snap_context *snapc = obj_req->img_request->snapc;
3282         u8 new_state;
3283         u32 i;
3284         int ret;
3285
3286         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3287
3288         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3289                 return;
3290
3291         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3292                 return;
3293
3294         for (i = 0; i < snapc->num_snaps; i++) {
3295                 if ((rbd_dev->header.features & RBD_FEATURE_FAST_DIFF) &&
3296                     i + 1 < snapc->num_snaps)
3297                         new_state = OBJECT_EXISTS_CLEAN;
3298                 else
3299                         new_state = OBJECT_EXISTS;
3300
3301                 ret = rbd_object_map_update(obj_req, snapc->snaps[i],
3302                                             new_state, NULL);
3303                 if (ret < 0) {
3304                         obj_req->pending.result = ret;
3305                         return;
3306                 }
3307
3308                 rbd_assert(!ret);
3309                 obj_req->pending.num_pending++;
3310         }
3311 }
3312
3313 static void rbd_obj_copyup_write_object(struct rbd_obj_request *obj_req)
3314 {
3315         u32 bytes = rbd_obj_img_extents_bytes(obj_req);
3316         int ret;
3317
3318         rbd_assert(!obj_req->pending.result && !obj_req->pending.num_pending);
3319
3320         /*
3321          * Only send non-zero copyup data to save some I/O and network
3322          * bandwidth -- zero copyup data is equivalent to the object not
3323          * existing.
3324          */
3325         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ZEROS)
3326                 bytes = 0;
3327
3328         if (obj_req->img_request->snapc->num_snaps && bytes > 0) {
3329                 /*
3330                  * Send a copyup request with an empty snapshot context to
3331                  * deep-copyup the object through all existing snapshots.
3332                  * A second request with the current snapshot context will be
3333                  * sent for the actual modification.
3334                  */
3335                 ret = rbd_obj_copyup_empty_snapc(obj_req, bytes);
3336                 if (ret) {
3337                         obj_req->pending.result = ret;
3338                         return;
3339                 }
3340
3341                 obj_req->pending.num_pending++;
3342                 bytes = MODS_ONLY;
3343         }
3344
3345         ret = rbd_obj_copyup_current_snapc(obj_req, bytes);
3346         if (ret) {
3347                 obj_req->pending.result = ret;
3348                 return;
3349         }
3350
3351         obj_req->pending.num_pending++;
3352 }
3353
3354 static bool rbd_obj_advance_copyup(struct rbd_obj_request *obj_req, int *result)
3355 {
3356         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3357         int ret;
3358
3359 again:
3360         switch (obj_req->copyup_state) {
3361         case RBD_OBJ_COPYUP_START:
3362                 rbd_assert(!*result);
3363
3364                 ret = rbd_obj_copyup_read_parent(obj_req);
3365                 if (ret) {
3366                         *result = ret;
3367                         return true;
3368                 }
3369                 if (obj_req->num_img_extents)
3370                         obj_req->copyup_state = RBD_OBJ_COPYUP_READ_PARENT;
3371                 else
3372                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3373                 return false;
3374         case RBD_OBJ_COPYUP_READ_PARENT:
3375                 if (*result)
3376                         return true;
3377
3378                 if (is_zero_bvecs(obj_req->copyup_bvecs,
3379                                   rbd_obj_img_extents_bytes(obj_req))) {
3380                         dout("%s %p detected zeros\n", __func__, obj_req);
3381                         obj_req->flags |= RBD_OBJ_FLAG_COPYUP_ZEROS;
3382                 }
3383
3384                 rbd_obj_copyup_object_maps(obj_req);
3385                 if (!obj_req->pending.num_pending) {
3386                         *result = obj_req->pending.result;
3387                         obj_req->copyup_state = RBD_OBJ_COPYUP_OBJECT_MAPS;
3388                         goto again;
3389                 }
3390                 obj_req->copyup_state = __RBD_OBJ_COPYUP_OBJECT_MAPS;
3391                 return false;
3392         case __RBD_OBJ_COPYUP_OBJECT_MAPS:
3393                 if (!pending_result_dec(&obj_req->pending, result))
3394                         return false;
3395                 /* fall through */
3396         case RBD_OBJ_COPYUP_OBJECT_MAPS:
3397                 if (*result) {
3398                         rbd_warn(rbd_dev, "snap object map update failed: %d",
3399                                  *result);
3400                         return true;
3401                 }
3402
3403                 rbd_obj_copyup_write_object(obj_req);
3404                 if (!obj_req->pending.num_pending) {
3405                         *result = obj_req->pending.result;
3406                         obj_req->copyup_state = RBD_OBJ_COPYUP_WRITE_OBJECT;
3407                         goto again;
3408                 }
3409                 obj_req->copyup_state = __RBD_OBJ_COPYUP_WRITE_OBJECT;
3410                 return false;
3411         case __RBD_OBJ_COPYUP_WRITE_OBJECT:
3412                 if (!pending_result_dec(&obj_req->pending, result))
3413                         return false;
3414                 /* fall through */
3415         case RBD_OBJ_COPYUP_WRITE_OBJECT:
3416                 return true;
3417         default:
3418                 BUG();
3419         }
3420 }
3421
3422 /*
3423  * Return:
3424  *   0 - object map update sent
3425  *   1 - object map update isn't needed
3426  *  <0 - error
3427  */
3428 static int rbd_obj_write_post_object_map(struct rbd_obj_request *obj_req)
3429 {
3430         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3431         u8 current_state = OBJECT_PENDING;
3432
3433         if (!(rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3434                 return 1;
3435
3436         if (!(obj_req->flags & RBD_OBJ_FLAG_DELETION))
3437                 return 1;
3438
3439         return rbd_object_map_update(obj_req, CEPH_NOSNAP, OBJECT_NONEXISTENT,
3440                                      &current_state);
3441 }
3442
3443 static bool rbd_obj_advance_write(struct rbd_obj_request *obj_req, int *result)
3444 {
3445         struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
3446         int ret;
3447
3448 again:
3449         switch (obj_req->write_state) {
3450         case RBD_OBJ_WRITE_START:
3451                 rbd_assert(!*result);
3452
3453                 if (rbd_obj_write_is_noop(obj_req))
3454                         return true;
3455
3456                 ret = rbd_obj_write_pre_object_map(obj_req);
3457                 if (ret < 0) {
3458                         *result = ret;
3459                         return true;
3460                 }
3461                 obj_req->write_state = RBD_OBJ_WRITE_PRE_OBJECT_MAP;
3462                 if (ret > 0)
3463                         goto again;
3464                 return false;
3465         case RBD_OBJ_WRITE_PRE_OBJECT_MAP:
3466                 if (*result) {
3467                         rbd_warn(rbd_dev, "pre object map update failed: %d",
3468                                  *result);
3469                         return true;
3470                 }
3471                 ret = rbd_obj_write_object(obj_req);
3472                 if (ret) {
3473                         *result = ret;
3474                         return true;
3475                 }
3476                 obj_req->write_state = RBD_OBJ_WRITE_OBJECT;
3477                 return false;
3478         case RBD_OBJ_WRITE_OBJECT:
3479                 if (*result == -ENOENT) {
3480                         if (obj_req->flags & RBD_OBJ_FLAG_COPYUP_ENABLED) {
3481                                 *result = 0;
3482                                 obj_req->copyup_state = RBD_OBJ_COPYUP_START;
3483                                 obj_req->write_state = __RBD_OBJ_WRITE_COPYUP;
3484                                 goto again;
3485                         }
3486                         /*
3487                          * On a non-existent object:
3488                          *   delete - -ENOENT, truncate/zero - 0
3489                          */
3490                         if (obj_req->flags & RBD_OBJ_FLAG_DELETION)
3491                                 *result = 0;
3492                 }
3493                 if (*result)
3494                         return true;
3495
3496                 obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
3497                 goto again;
3498         case __RBD_OBJ_WRITE_COPYUP:
3499                 if (!rbd_obj_advance_copyup(obj_req, result))
3500                         return false;
3501                 /* fall through */
3502         case RBD_OBJ_WRITE_COPYUP:
3503                 if (*result) {
3504                         rbd_warn(rbd_dev, "copyup failed: %d", *result);
3505                         return true;
3506                 }
3507                 ret = rbd_obj_write_post_object_map(obj_req);
3508                 if (ret < 0) {
3509                         *result = ret;
3510                         return true;
3511                 }
3512                 obj_req->write_state = RBD_OBJ_WRITE_POST_OBJECT_MAP;
3513                 if (ret > 0)
3514                         goto again;
3515                 return false;
3516         case RBD_OBJ_WRITE_POST_OBJECT_MAP:
3517                 if (*result)
3518                         rbd_warn(rbd_dev, "post object map update failed: %d",
3519                                  *result);
3520                 return true;
3521         default:
3522                 BUG();
3523         }
3524 }
3525
3526 /*
3527  * Return true if @obj_req is completed.
3528  */
3529 static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req,
3530                                      int *result)
3531 {
3532         struct rbd_img_request *img_req = obj_req->img_request;
3533         struct rbd_device *rbd_dev = img_req->rbd_dev;
3534         bool done;
3535
3536         mutex_lock(&obj_req->state_mutex);
3537         if (!rbd_img_is_write(img_req))
3538                 done = rbd_obj_advance_read(obj_req, result);
3539         else
3540                 done = rbd_obj_advance_write(obj_req, result);
3541         mutex_unlock(&obj_req->state_mutex);
3542
3543         if (done && *result) {
3544                 rbd_assert(*result < 0);
3545                 rbd_warn(rbd_dev, "%s at objno %llu %llu~%llu result %d",
3546                          obj_op_name(img_req->op_type), obj_req->ex.oe_objno,
3547                          obj_req->ex.oe_off, obj_req->ex.oe_len, *result);
3548         }
3549         return done;
3550 }
3551
3552 /*
3553  * This is open-coded in rbd_img_handle_request() to avoid parent chain
3554  * recursion.
3555  */
3556 static void rbd_obj_handle_request(struct rbd_obj_request *obj_req, int result)
3557 {
3558         if (__rbd_obj_handle_request(obj_req, &result))
3559                 rbd_img_handle_request(obj_req->img_request, result);
3560 }
3561
3562 static bool need_exclusive_lock(struct rbd_img_request *img_req)
3563 {
3564         struct rbd_device *rbd_dev = img_req->rbd_dev;
3565
3566         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK))
3567                 return false;
3568
3569         if (rbd_is_snap(rbd_dev))
3570                 return false;
3571
3572         rbd_assert(!test_bit(IMG_REQ_CHILD, &img_req->flags));
3573         if (rbd_dev->opts->lock_on_read ||
3574             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP))
3575                 return true;
3576
3577         return rbd_img_is_write(img_req);
3578 }
3579
3580 static bool rbd_lock_add_request(struct rbd_img_request *img_req)
3581 {
3582         struct rbd_device *rbd_dev = img_req->rbd_dev;
3583         bool locked;
3584
3585         lockdep_assert_held(&rbd_dev->lock_rwsem);
3586         locked = rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED;
3587         spin_lock(&rbd_dev->lock_lists_lock);
3588         rbd_assert(list_empty(&img_req->lock_item));
3589         if (!locked)
3590                 list_add_tail(&img_req->lock_item, &rbd_dev->acquiring_list);
3591         else
3592                 list_add_tail(&img_req->lock_item, &rbd_dev->running_list);
3593         spin_unlock(&rbd_dev->lock_lists_lock);
3594         return locked;
3595 }
3596
3597 static void rbd_lock_del_request(struct rbd_img_request *img_req)
3598 {
3599         struct rbd_device *rbd_dev = img_req->rbd_dev;
3600         bool need_wakeup;
3601
3602         lockdep_assert_held(&rbd_dev->lock_rwsem);
3603         spin_lock(&rbd_dev->lock_lists_lock);
3604         rbd_assert(!list_empty(&img_req->lock_item));
3605         list_del_init(&img_req->lock_item);
3606         need_wakeup = (rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING &&
3607                        list_empty(&rbd_dev->running_list));
3608         spin_unlock(&rbd_dev->lock_lists_lock);
3609         if (need_wakeup)
3610                 complete(&rbd_dev->releasing_wait);
3611 }
3612
3613 static int rbd_img_exclusive_lock(struct rbd_img_request *img_req)
3614 {
3615         struct rbd_device *rbd_dev = img_req->rbd_dev;
3616
3617         if (!need_exclusive_lock(img_req))
3618                 return 1;
3619
3620         if (rbd_lock_add_request(img_req))
3621                 return 1;
3622
3623         if (rbd_dev->opts->exclusive) {
3624                 WARN_ON(1); /* lock got released? */
3625                 return -EROFS;
3626         }
3627
3628         /*
3629          * Note the use of mod_delayed_work() in rbd_acquire_lock()
3630          * and cancel_delayed_work() in wake_lock_waiters().
3631          */
3632         dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
3633         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
3634         return 0;
3635 }
3636
3637 static void rbd_img_object_requests(struct rbd_img_request *img_req)
3638 {
3639         struct rbd_obj_request *obj_req;
3640
3641         rbd_assert(!img_req->pending.result && !img_req->pending.num_pending);
3642
3643         for_each_obj_request(img_req, obj_req) {
3644                 int result = 0;
3645
3646                 if (__rbd_obj_handle_request(obj_req, &result)) {
3647                         if (result) {
3648                                 img_req->pending.result = result;
3649                                 return;
3650                         }
3651                 } else {
3652                         img_req->pending.num_pending++;
3653                 }
3654         }
3655 }
3656
3657 static bool rbd_img_advance(struct rbd_img_request *img_req, int *result)
3658 {
3659         struct rbd_device *rbd_dev = img_req->rbd_dev;
3660         int ret;
3661
3662 again:
3663         switch (img_req->state) {
3664         case RBD_IMG_START:
3665                 rbd_assert(!*result);
3666
3667                 ret = rbd_img_exclusive_lock(img_req);
3668                 if (ret < 0) {
3669                         *result = ret;
3670                         return true;
3671                 }
3672                 img_req->state = RBD_IMG_EXCLUSIVE_LOCK;
3673                 if (ret > 0)
3674                         goto again;
3675                 return false;
3676         case RBD_IMG_EXCLUSIVE_LOCK:
3677                 if (*result)
3678                         return true;
3679
3680                 rbd_assert(!need_exclusive_lock(img_req) ||
3681                            __rbd_is_lock_owner(rbd_dev));
3682
3683                 rbd_img_object_requests(img_req);
3684                 if (!img_req->pending.num_pending) {
3685                         *result = img_req->pending.result;
3686                         img_req->state = RBD_IMG_OBJECT_REQUESTS;
3687                         goto again;
3688                 }
3689                 img_req->state = __RBD_IMG_OBJECT_REQUESTS;
3690                 return false;
3691         case __RBD_IMG_OBJECT_REQUESTS:
3692                 if (!pending_result_dec(&img_req->pending, result))
3693                         return false;
3694                 /* fall through */
3695         case RBD_IMG_OBJECT_REQUESTS:
3696                 return true;
3697         default:
3698                 BUG();
3699         }
3700 }
3701
3702 /*
3703  * Return true if @img_req is completed.
3704  */
3705 static bool __rbd_img_handle_request(struct rbd_img_request *img_req,
3706                                      int *result)
3707 {
3708         struct rbd_device *rbd_dev = img_req->rbd_dev;
3709         bool done;
3710
3711         if (need_exclusive_lock(img_req)) {
3712                 down_read(&rbd_dev->lock_rwsem);
3713                 mutex_lock(&img_req->state_mutex);
3714                 done = rbd_img_advance(img_req, result);
3715                 if (done)
3716                         rbd_lock_del_request(img_req);
3717                 mutex_unlock(&img_req->state_mutex);
3718                 up_read(&rbd_dev->lock_rwsem);
3719         } else {
3720                 mutex_lock(&img_req->state_mutex);
3721                 done = rbd_img_advance(img_req, result);
3722                 mutex_unlock(&img_req->state_mutex);
3723         }
3724
3725         if (done && *result) {
3726                 rbd_assert(*result < 0);
3727                 rbd_warn(rbd_dev, "%s%s result %d",
3728                       test_bit(IMG_REQ_CHILD, &img_req->flags) ? "child " : "",
3729                       obj_op_name(img_req->op_type), *result);
3730         }
3731         return done;
3732 }
3733
3734 static void rbd_img_handle_request(struct rbd_img_request *img_req, int result)
3735 {
3736 again:
3737         if (!__rbd_img_handle_request(img_req, &result))
3738                 return;
3739
3740         if (test_bit(IMG_REQ_CHILD, &img_req->flags)) {
3741                 struct rbd_obj_request *obj_req = img_req->obj_request;
3742
3743                 rbd_img_request_put(img_req);
3744                 if (__rbd_obj_handle_request(obj_req, &result)) {
3745                         img_req = obj_req->img_request;
3746                         goto again;
3747                 }
3748         } else {
3749                 struct request *rq = img_req->rq;
3750
3751                 rbd_img_request_put(img_req);
3752                 blk_mq_end_request(rq, errno_to_blk_status(result));
3753         }
3754 }
3755
3756 static const struct rbd_client_id rbd_empty_cid;
3757
3758 static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3759                           const struct rbd_client_id *rhs)
3760 {
3761         return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3762 }
3763
3764 static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3765 {
3766         struct rbd_client_id cid;
3767
3768         mutex_lock(&rbd_dev->watch_mutex);
3769         cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3770         cid.handle = rbd_dev->watch_cookie;
3771         mutex_unlock(&rbd_dev->watch_mutex);
3772         return cid;
3773 }
3774
3775 /*
3776  * lock_rwsem must be held for write
3777  */
3778 static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3779                               const struct rbd_client_id *cid)
3780 {
3781         dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3782              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3783              cid->gid, cid->handle);
3784         rbd_dev->owner_cid = *cid; /* struct */
3785 }
3786
3787 static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3788 {
3789         mutex_lock(&rbd_dev->watch_mutex);
3790         sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3791         mutex_unlock(&rbd_dev->watch_mutex);
3792 }
3793
3794 static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3795 {
3796         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3797
3798         rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
3799         strcpy(rbd_dev->lock_cookie, cookie);
3800         rbd_set_owner_cid(rbd_dev, &cid);
3801         queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3802 }
3803
3804 /*
3805  * lock_rwsem must be held for write
3806  */
3807 static int rbd_lock(struct rbd_device *rbd_dev)
3808 {
3809         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3810         char cookie[32];
3811         int ret;
3812
3813         WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3814                 rbd_dev->lock_cookie[0] != '\0');
3815
3816         format_lock_cookie(rbd_dev, cookie);
3817         ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3818                             RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3819                             RBD_LOCK_TAG, "", 0);
3820         if (ret)
3821                 return ret;
3822
3823         __rbd_lock(rbd_dev, cookie);
3824         return 0;
3825 }
3826
3827 /*
3828  * lock_rwsem must be held for write
3829  */
3830 static void rbd_unlock(struct rbd_device *rbd_dev)
3831 {
3832         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3833         int ret;
3834
3835         WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3836                 rbd_dev->lock_cookie[0] == '\0');
3837
3838         ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3839                               RBD_LOCK_NAME, rbd_dev->lock_cookie);
3840         if (ret && ret != -ENOENT)
3841                 rbd_warn(rbd_dev, "failed to unlock header: %d", ret);
3842
3843         /* treat errors as the image is unlocked */
3844         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
3845         rbd_dev->lock_cookie[0] = '\0';
3846         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3847         queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
3848 }
3849
3850 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3851                                 enum rbd_notify_op notify_op,
3852                                 struct page ***preply_pages,
3853                                 size_t *preply_len)
3854 {
3855         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3856         struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3857         char buf[4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN];
3858         int buf_size = sizeof(buf);
3859         void *p = buf;
3860
3861         dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
3862
3863         /* encode *LockPayload NotifyMessage (op + ClientId) */
3864         ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3865         ceph_encode_32(&p, notify_op);
3866         ceph_encode_64(&p, cid.gid);
3867         ceph_encode_64(&p, cid.handle);
3868
3869         return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3870                                 &rbd_dev->header_oloc, buf, buf_size,
3871                                 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
3872 }
3873
3874 static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3875                                enum rbd_notify_op notify_op)
3876 {
3877         struct page **reply_pages;
3878         size_t reply_len;
3879
3880         __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3881         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3882 }
3883
3884 static void rbd_notify_acquired_lock(struct work_struct *work)
3885 {
3886         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3887                                                   acquired_lock_work);
3888
3889         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
3890 }
3891
3892 static void rbd_notify_released_lock(struct work_struct *work)
3893 {
3894         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3895                                                   released_lock_work);
3896
3897         rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
3898 }
3899
3900 static int rbd_request_lock(struct rbd_device *rbd_dev)
3901 {
3902         struct page **reply_pages;
3903         size_t reply_len;
3904         bool lock_owner_responded = false;
3905         int ret;
3906
3907         dout("%s rbd_dev %p\n", __func__, rbd_dev);
3908
3909         ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3910                                    &reply_pages, &reply_len);
3911         if (ret && ret != -ETIMEDOUT) {
3912                 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
3913                 goto out;
3914         }
3915
3916         if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3917                 void *p = page_address(reply_pages[0]);
3918                 void *const end = p + reply_len;
3919                 u32 n;
3920
3921                 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3922                 while (n--) {
3923                         u8 struct_v;
3924                         u32 len;
3925
3926                         ceph_decode_need(&p, end, 8 + 8, e_inval);
3927                         p += 8 + 8; /* skip gid and cookie */
3928
3929                         ceph_decode_32_safe(&p, end, len, e_inval);
3930                         if (!len)
3931                                 continue;
3932
3933                         if (lock_owner_responded) {
3934                                 rbd_warn(rbd_dev,
3935                                          "duplicate lock owners detected");
3936                                 ret = -EIO;
3937                                 goto out;
3938                         }
3939
3940                         lock_owner_responded = true;
3941                         ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3942                                                   &struct_v, &len);
3943                         if (ret) {
3944                                 rbd_warn(rbd_dev,
3945                                          "failed to decode ResponseMessage: %d",
3946                                          ret);
3947                                 goto e_inval;
3948                         }
3949
3950                         ret = ceph_decode_32(&p);
3951                 }
3952         }
3953
3954         if (!lock_owner_responded) {
3955                 rbd_warn(rbd_dev, "no lock owners detected");
3956                 ret = -ETIMEDOUT;
3957         }
3958
3959 out:
3960         ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3961         return ret;
3962
3963 e_inval:
3964         ret = -EINVAL;
3965         goto out;
3966 }
3967
3968 /*
3969  * Either image request state machine(s) or rbd_add_acquire_lock()
3970  * (i.e. "rbd map").
3971  */
3972 static void wake_lock_waiters(struct rbd_device *rbd_dev, int result)
3973 {
3974         struct rbd_img_request *img_req;
3975
3976         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3977         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
3978
3979         cancel_delayed_work(&rbd_dev->lock_dwork);
3980         if (!completion_done(&rbd_dev->acquire_wait)) {
3981                 rbd_assert(list_empty(&rbd_dev->acquiring_list) &&
3982                            list_empty(&rbd_dev->running_list));
3983                 rbd_dev->acquire_err = result;
3984                 complete_all(&rbd_dev->acquire_wait);
3985                 return;
3986         }
3987
3988         list_for_each_entry(img_req, &rbd_dev->acquiring_list, lock_item) {
3989                 mutex_lock(&img_req->state_mutex);
3990                 rbd_assert(img_req->state == RBD_IMG_EXCLUSIVE_LOCK);
3991                 rbd_img_schedule(img_req, result);
3992                 mutex_unlock(&img_req->state_mutex);
3993         }
3994
3995         list_splice_tail_init(&rbd_dev->acquiring_list, &rbd_dev->running_list);
3996 }
3997
3998 static int get_lock_owner_info(struct rbd_device *rbd_dev,
3999                                struct ceph_locker **lockers, u32 *num_lockers)
4000 {
4001         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4002         u8 lock_type;
4003         char *lock_tag;
4004         int ret;
4005
4006         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4007
4008         ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
4009                                  &rbd_dev->header_oloc, RBD_LOCK_NAME,
4010                                  &lock_type, &lock_tag, lockers, num_lockers);
4011         if (ret)
4012                 return ret;
4013
4014         if (*num_lockers == 0) {
4015                 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
4016                 goto out;
4017         }
4018
4019         if (strcmp(lock_tag, RBD_LOCK_TAG)) {
4020                 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
4021                          lock_tag);
4022                 ret = -EBUSY;
4023                 goto out;
4024         }
4025
4026         if (lock_type == CEPH_CLS_LOCK_SHARED) {
4027                 rbd_warn(rbd_dev, "shared lock type detected");
4028                 ret = -EBUSY;
4029                 goto out;
4030         }
4031
4032         if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
4033                     strlen(RBD_LOCK_COOKIE_PREFIX))) {
4034                 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
4035                          (*lockers)[0].id.cookie);
4036                 ret = -EBUSY;
4037                 goto out;
4038         }
4039
4040 out:
4041         kfree(lock_tag);
4042         return ret;
4043 }
4044
4045 static int find_watcher(struct rbd_device *rbd_dev,
4046                         const struct ceph_locker *locker)
4047 {
4048         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4049         struct ceph_watch_item *watchers;
4050         u32 num_watchers;
4051         u64 cookie;
4052         int i;
4053         int ret;
4054
4055         ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
4056                                       &rbd_dev->header_oloc, &watchers,
4057                                       &num_watchers);
4058         if (ret)
4059                 return ret;
4060
4061         sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
4062         for (i = 0; i < num_watchers; i++) {
4063                 if (!memcmp(&watchers[i].addr, &locker->info.addr,
4064                             sizeof(locker->info.addr)) &&
4065                     watchers[i].cookie == cookie) {
4066                         struct rbd_client_id cid = {
4067                                 .gid = le64_to_cpu(watchers[i].name.num),
4068                                 .handle = cookie,
4069                         };
4070
4071                         dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
4072                              rbd_dev, cid.gid, cid.handle);
4073                         rbd_set_owner_cid(rbd_dev, &cid);
4074                         ret = 1;
4075                         goto out;
4076                 }
4077         }
4078
4079         dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
4080         ret = 0;
4081 out:
4082         kfree(watchers);
4083         return ret;
4084 }
4085
4086 /*
4087  * lock_rwsem must be held for write
4088  */
4089 static int rbd_try_lock(struct rbd_device *rbd_dev)
4090 {
4091         struct ceph_client *client = rbd_dev->rbd_client->client;
4092         struct ceph_locker *lockers;
4093         u32 num_lockers;
4094         int ret;
4095
4096         for (;;) {
4097                 ret = rbd_lock(rbd_dev);
4098                 if (ret != -EBUSY)
4099                         return ret;
4100
4101                 /* determine if the current lock holder is still alive */
4102                 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
4103                 if (ret)
4104                         return ret;
4105
4106                 if (num_lockers == 0)
4107                         goto again;
4108
4109                 ret = find_watcher(rbd_dev, lockers);
4110                 if (ret)
4111                         goto out; /* request lock or error */
4112
4113                 rbd_warn(rbd_dev, "breaking header lock owned by %s%llu",
4114                          ENTITY_NAME(lockers[0].id.name));
4115
4116                 ret = ceph_monc_blacklist_add(&client->monc,
4117                                               &lockers[0].info.addr);
4118                 if (ret) {
4119                         rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
4120                                  ENTITY_NAME(lockers[0].id.name), ret);
4121                         goto out;
4122                 }
4123
4124                 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
4125                                           &rbd_dev->header_oloc, RBD_LOCK_NAME,
4126                                           lockers[0].id.cookie,
4127                                           &lockers[0].id.name);
4128                 if (ret && ret != -ENOENT)
4129                         goto out;
4130
4131 again:
4132                 ceph_free_lockers(lockers, num_lockers);
4133         }
4134
4135 out:
4136         ceph_free_lockers(lockers, num_lockers);
4137         return ret;
4138 }
4139
4140 static int rbd_post_acquire_action(struct rbd_device *rbd_dev)
4141 {
4142         int ret;
4143
4144         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP) {
4145                 ret = rbd_object_map_open(rbd_dev);
4146                 if (ret)
4147                         return ret;
4148         }
4149
4150         return 0;
4151 }
4152
4153 /*
4154  * Return:
4155  *   0 - lock acquired
4156  *   1 - caller should call rbd_request_lock()
4157  *  <0 - error
4158  */
4159 static int rbd_try_acquire_lock(struct rbd_device *rbd_dev)
4160 {
4161         int ret;
4162
4163         down_read(&rbd_dev->lock_rwsem);
4164         dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
4165              rbd_dev->lock_state);
4166         if (__rbd_is_lock_owner(rbd_dev)) {
4167                 up_read(&rbd_dev->lock_rwsem);
4168                 return 0;
4169         }
4170
4171         up_read(&rbd_dev->lock_rwsem);
4172         down_write(&rbd_dev->lock_rwsem);
4173         dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
4174              rbd_dev->lock_state);
4175         if (__rbd_is_lock_owner(rbd_dev)) {
4176                 up_write(&rbd_dev->lock_rwsem);
4177                 return 0;
4178         }
4179
4180         ret = rbd_try_lock(rbd_dev);
4181         if (ret < 0) {
4182                 rbd_warn(rbd_dev, "failed to lock header: %d", ret);
4183                 if (ret == -EBLACKLISTED)
4184                         goto out;
4185
4186                 ret = 1; /* request lock anyway */
4187         }
4188         if (ret > 0) {
4189                 up_write(&rbd_dev->lock_rwsem);
4190                 return ret;
4191         }
4192
4193         rbd_assert(rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED);
4194         rbd_assert(list_empty(&rbd_dev->running_list));
4195
4196         ret = rbd_post_acquire_action(rbd_dev);
4197         if (ret) {
4198                 rbd_warn(rbd_dev, "post-acquire action failed: %d", ret);
4199                 /*
4200                  * Can't stay in RBD_LOCK_STATE_LOCKED because
4201                  * rbd_lock_add_request() would let the request through,
4202                  * assuming that e.g. object map is locked and loaded.
4203                  */
4204                 rbd_unlock(rbd_dev);
4205         }
4206
4207 out:
4208         wake_lock_waiters(rbd_dev, ret);
4209         up_write(&rbd_dev->lock_rwsem);
4210         return ret;
4211 }
4212
4213 static void rbd_acquire_lock(struct work_struct *work)
4214 {
4215         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4216                                             struct rbd_device, lock_dwork);
4217         int ret;
4218
4219         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4220 again:
4221         ret = rbd_try_acquire_lock(rbd_dev);
4222         if (ret <= 0) {
4223                 dout("%s rbd_dev %p ret %d - done\n", __func__, rbd_dev, ret);
4224                 return;
4225         }
4226
4227         ret = rbd_request_lock(rbd_dev);
4228         if (ret == -ETIMEDOUT) {
4229                 goto again; /* treat this as a dead client */
4230         } else if (ret == -EROFS) {
4231                 rbd_warn(rbd_dev, "peer will not release lock");
4232                 down_write(&rbd_dev->lock_rwsem);
4233                 wake_lock_waiters(rbd_dev, ret);
4234                 up_write(&rbd_dev->lock_rwsem);
4235         } else if (ret < 0) {
4236                 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
4237                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4238                                  RBD_RETRY_DELAY);
4239         } else {
4240                 /*
4241                  * lock owner acked, but resend if we don't see them
4242                  * release the lock
4243                  */
4244                 dout("%s rbd_dev %p requeuing lock_dwork\n", __func__,
4245                      rbd_dev);
4246                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
4247                     msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
4248         }
4249 }
4250
4251 static bool rbd_quiesce_lock(struct rbd_device *rbd_dev)
4252 {
4253         bool need_wait;
4254
4255         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4256         lockdep_assert_held_write(&rbd_dev->lock_rwsem);
4257
4258         if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
4259                 return false;
4260
4261         /*
4262          * Ensure that all in-flight IO is flushed.
4263          */
4264         rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
4265         rbd_assert(!completion_done(&rbd_dev->releasing_wait));
4266         need_wait = !list_empty(&rbd_dev->running_list);
4267         downgrade_write(&rbd_dev->lock_rwsem);
4268         if (need_wait)
4269                 wait_for_completion(&rbd_dev->releasing_wait);
4270         up_read(&rbd_dev->lock_rwsem);
4271
4272         down_write(&rbd_dev->lock_rwsem);
4273         if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
4274                 return false;
4275
4276         rbd_assert(list_empty(&rbd_dev->running_list));
4277         return true;
4278 }
4279
4280 static void rbd_pre_release_action(struct rbd_device *rbd_dev)
4281 {
4282         if (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)
4283                 rbd_object_map_close(rbd_dev);
4284 }
4285
4286 static void __rbd_release_lock(struct rbd_device *rbd_dev)
4287 {
4288         rbd_assert(list_empty(&rbd_dev->running_list));
4289
4290         rbd_pre_release_action(rbd_dev);
4291         rbd_unlock(rbd_dev);
4292 }
4293
4294 /*
4295  * lock_rwsem must be held for write
4296  */
4297 static void rbd_release_lock(struct rbd_device *rbd_dev)
4298 {
4299         if (!rbd_quiesce_lock(rbd_dev))
4300                 return;
4301
4302         __rbd_release_lock(rbd_dev);
4303
4304         /*
4305          * Give others a chance to grab the lock - we would re-acquire
4306          * almost immediately if we got new IO while draining the running
4307          * list otherwise.  We need to ack our own notifications, so this
4308          * lock_dwork will be requeued from rbd_handle_released_lock() by
4309          * way of maybe_kick_acquire().
4310          */
4311         cancel_delayed_work(&rbd_dev->lock_dwork);
4312 }
4313
4314 static void rbd_release_lock_work(struct work_struct *work)
4315 {
4316         struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
4317                                                   unlock_work);
4318
4319         down_write(&rbd_dev->lock_rwsem);
4320         rbd_release_lock(rbd_dev);
4321         up_write(&rbd_dev->lock_rwsem);
4322 }
4323
4324 static void maybe_kick_acquire(struct rbd_device *rbd_dev)
4325 {
4326         bool have_requests;
4327
4328         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4329         if (__rbd_is_lock_owner(rbd_dev))
4330                 return;
4331
4332         spin_lock(&rbd_dev->lock_lists_lock);
4333         have_requests = !list_empty(&rbd_dev->acquiring_list);
4334         spin_unlock(&rbd_dev->lock_lists_lock);
4335         if (have_requests || delayed_work_pending(&rbd_dev->lock_dwork)) {
4336                 dout("%s rbd_dev %p kicking lock_dwork\n", __func__, rbd_dev);
4337                 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4338         }
4339 }
4340
4341 static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
4342                                      void **p)
4343 {
4344         struct rbd_client_id cid = { 0 };
4345
4346         if (struct_v >= 2) {
4347                 cid.gid = ceph_decode_64(p);
4348                 cid.handle = ceph_decode_64(p);
4349         }
4350
4351         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4352              cid.handle);
4353         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4354                 down_write(&rbd_dev->lock_rwsem);
4355                 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4356                         /*
4357                          * we already know that the remote client is
4358                          * the owner
4359                          */
4360                         up_write(&rbd_dev->lock_rwsem);
4361                         return;
4362                 }
4363
4364                 rbd_set_owner_cid(rbd_dev, &cid);
4365                 downgrade_write(&rbd_dev->lock_rwsem);
4366         } else {
4367                 down_read(&rbd_dev->lock_rwsem);
4368         }
4369
4370         maybe_kick_acquire(rbd_dev);
4371         up_read(&rbd_dev->lock_rwsem);
4372 }
4373
4374 static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
4375                                      void **p)
4376 {
4377         struct rbd_client_id cid = { 0 };
4378
4379         if (struct_v >= 2) {
4380                 cid.gid = ceph_decode_64(p);
4381                 cid.handle = ceph_decode_64(p);
4382         }
4383
4384         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4385              cid.handle);
4386         if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
4387                 down_write(&rbd_dev->lock_rwsem);
4388                 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
4389                         dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
4390                              __func__, rbd_dev, cid.gid, cid.handle,
4391                              rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
4392                         up_write(&rbd_dev->lock_rwsem);
4393                         return;
4394                 }
4395
4396                 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4397                 downgrade_write(&rbd_dev->lock_rwsem);
4398         } else {
4399                 down_read(&rbd_dev->lock_rwsem);
4400         }
4401
4402         maybe_kick_acquire(rbd_dev);
4403         up_read(&rbd_dev->lock_rwsem);
4404 }
4405
4406 /*
4407  * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
4408  * ResponseMessage is needed.
4409  */
4410 static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
4411                                    void **p)
4412 {
4413         struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
4414         struct rbd_client_id cid = { 0 };
4415         int result = 1;
4416
4417         if (struct_v >= 2) {
4418                 cid.gid = ceph_decode_64(p);
4419                 cid.handle = ceph_decode_64(p);
4420         }
4421
4422         dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
4423              cid.handle);
4424         if (rbd_cid_equal(&cid, &my_cid))
4425                 return result;
4426
4427         down_read(&rbd_dev->lock_rwsem);
4428         if (__rbd_is_lock_owner(rbd_dev)) {
4429                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
4430                     rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
4431                         goto out_unlock;
4432
4433                 /*
4434                  * encode ResponseMessage(0) so the peer can detect
4435                  * a missing owner
4436                  */
4437                 result = 0;
4438
4439                 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
4440                         if (!rbd_dev->opts->exclusive) {
4441                                 dout("%s rbd_dev %p queueing unlock_work\n",
4442                                      __func__, rbd_dev);
4443                                 queue_work(rbd_dev->task_wq,
4444                                            &rbd_dev->unlock_work);
4445                         } else {
4446                                 /* refuse to release the lock */
4447                                 result = -EROFS;
4448                         }
4449                 }
4450         }
4451
4452 out_unlock:
4453         up_read(&rbd_dev->lock_rwsem);
4454         return result;
4455 }
4456
4457 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
4458                                      u64 notify_id, u64 cookie, s32 *result)
4459 {
4460         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4461         char buf[4 + CEPH_ENCODING_START_BLK_LEN];
4462         int buf_size = sizeof(buf);
4463         int ret;
4464
4465         if (result) {
4466                 void *p = buf;
4467
4468                 /* encode ResponseMessage */
4469                 ceph_start_encoding(&p, 1, 1,
4470                                     buf_size - CEPH_ENCODING_START_BLK_LEN);
4471                 ceph_encode_32(&p, *result);
4472         } else {
4473                 buf_size = 0;
4474         }
4475
4476         ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
4477                                    &rbd_dev->header_oloc, notify_id, cookie,
4478                                    buf, buf_size);
4479         if (ret)
4480                 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
4481 }
4482
4483 static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
4484                                    u64 cookie)
4485 {
4486         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4487         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
4488 }
4489
4490 static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
4491                                           u64 notify_id, u64 cookie, s32 result)
4492 {
4493         dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
4494         __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
4495 }
4496
4497 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
4498                          u64 notifier_id, void *data, size_t data_len)
4499 {
4500         struct rbd_device *rbd_dev = arg;
4501         void *p = data;
4502         void *const end = p + data_len;
4503         u8 struct_v = 0;
4504         u32 len;
4505         u32 notify_op;
4506         int ret;
4507
4508         dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
4509              __func__, rbd_dev, cookie, notify_id, data_len);
4510         if (data_len) {
4511                 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
4512                                           &struct_v, &len);
4513                 if (ret) {
4514                         rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
4515                                  ret);
4516                         return;
4517                 }
4518
4519                 notify_op = ceph_decode_32(&p);
4520         } else {
4521                 /* legacy notification for header updates */
4522                 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
4523                 len = 0;
4524         }
4525
4526         dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
4527         switch (notify_op) {
4528         case RBD_NOTIFY_OP_ACQUIRED_LOCK:
4529                 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
4530                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4531                 break;
4532         case RBD_NOTIFY_OP_RELEASED_LOCK:
4533                 rbd_handle_released_lock(rbd_dev, struct_v, &p);
4534                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4535                 break;
4536         case RBD_NOTIFY_OP_REQUEST_LOCK:
4537                 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
4538                 if (ret <= 0)
4539                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4540                                                       cookie, ret);
4541                 else
4542                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4543                 break;
4544         case RBD_NOTIFY_OP_HEADER_UPDATE:
4545                 ret = rbd_dev_refresh(rbd_dev);
4546                 if (ret)
4547                         rbd_warn(rbd_dev, "refresh failed: %d", ret);
4548
4549                 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4550                 break;
4551         default:
4552                 if (rbd_is_lock_owner(rbd_dev))
4553                         rbd_acknowledge_notify_result(rbd_dev, notify_id,
4554                                                       cookie, -EOPNOTSUPP);
4555                 else
4556                         rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
4557                 break;
4558         }
4559 }
4560
4561 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
4562
4563 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
4564 {
4565         struct rbd_device *rbd_dev = arg;
4566
4567         rbd_warn(rbd_dev, "encountered watch error: %d", err);
4568
4569         down_write(&rbd_dev->lock_rwsem);
4570         rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
4571         up_write(&rbd_dev->lock_rwsem);
4572
4573         mutex_lock(&rbd_dev->watch_mutex);
4574         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
4575                 __rbd_unregister_watch(rbd_dev);
4576                 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
4577
4578                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
4579         }
4580         mutex_unlock(&rbd_dev->watch_mutex);
4581 }
4582
4583 /*
4584  * watch_mutex must be locked
4585  */
4586 static int __rbd_register_watch(struct rbd_device *rbd_dev)
4587 {
4588         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4589         struct ceph_osd_linger_request *handle;
4590
4591         rbd_assert(!rbd_dev->watch_handle);
4592         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4593
4594         handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
4595                                  &rbd_dev->header_oloc, rbd_watch_cb,
4596                                  rbd_watch_errcb, rbd_dev);
4597         if (IS_ERR(handle))
4598                 return PTR_ERR(handle);
4599
4600         rbd_dev->watch_handle = handle;
4601         return 0;
4602 }
4603
4604 /*
4605  * watch_mutex must be locked
4606  */
4607 static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
4608 {
4609         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4610         int ret;
4611
4612         rbd_assert(rbd_dev->watch_handle);
4613         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4614
4615         ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
4616         if (ret)
4617                 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
4618
4619         rbd_dev->watch_handle = NULL;
4620 }
4621
4622 static int rbd_register_watch(struct rbd_device *rbd_dev)
4623 {
4624         int ret;
4625
4626         mutex_lock(&rbd_dev->watch_mutex);
4627         rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
4628         ret = __rbd_register_watch(rbd_dev);
4629         if (ret)
4630                 goto out;
4631
4632         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4633         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4634
4635 out:
4636         mutex_unlock(&rbd_dev->watch_mutex);
4637         return ret;
4638 }
4639
4640 static void cancel_tasks_sync(struct rbd_device *rbd_dev)
4641 {
4642         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4643
4644         cancel_work_sync(&rbd_dev->acquired_lock_work);
4645         cancel_work_sync(&rbd_dev->released_lock_work);
4646         cancel_delayed_work_sync(&rbd_dev->lock_dwork);
4647         cancel_work_sync(&rbd_dev->unlock_work);
4648 }
4649
4650 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
4651 {
4652         cancel_tasks_sync(rbd_dev);
4653
4654         mutex_lock(&rbd_dev->watch_mutex);
4655         if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
4656                 __rbd_unregister_watch(rbd_dev);
4657         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4658         mutex_unlock(&rbd_dev->watch_mutex);
4659
4660         cancel_delayed_work_sync(&rbd_dev->watch_dwork);
4661         ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
4662 }
4663
4664 /*
4665  * lock_rwsem must be held for write
4666  */
4667 static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
4668 {
4669         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4670         char cookie[32];
4671         int ret;
4672
4673         if (!rbd_quiesce_lock(rbd_dev))
4674                 return;
4675
4676         format_lock_cookie(rbd_dev, cookie);
4677         ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
4678                                   &rbd_dev->header_oloc, RBD_LOCK_NAME,
4679                                   CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
4680                                   RBD_LOCK_TAG, cookie);
4681         if (ret) {
4682                 if (ret != -EOPNOTSUPP)
4683                         rbd_warn(rbd_dev, "failed to update lock cookie: %d",
4684                                  ret);
4685
4686                 /*
4687                  * Lock cookie cannot be updated on older OSDs, so do
4688                  * a manual release and queue an acquire.
4689                  */
4690                 __rbd_release_lock(rbd_dev);
4691                 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4692         } else {
4693                 __rbd_lock(rbd_dev, cookie);
4694                 wake_lock_waiters(rbd_dev, 0);
4695         }
4696 }
4697
4698 static void rbd_reregister_watch(struct work_struct *work)
4699 {
4700         struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
4701                                             struct rbd_device, watch_dwork);
4702         int ret;
4703
4704         dout("%s rbd_dev %p\n", __func__, rbd_dev);
4705
4706         mutex_lock(&rbd_dev->watch_mutex);
4707         if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
4708                 mutex_unlock(&rbd_dev->watch_mutex);
4709                 return;
4710         }
4711
4712         ret = __rbd_register_watch(rbd_dev);
4713         if (ret) {
4714                 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4715                 if (ret != -EBLACKLISTED && ret != -ENOENT) {
4716                         queue_delayed_work(rbd_dev->task_wq,
4717                                            &rbd_dev->watch_dwork,
4718                                            RBD_RETRY_DELAY);
4719                         mutex_unlock(&rbd_dev->watch_mutex);
4720                         return;
4721                 }
4722
4723                 mutex_unlock(&rbd_dev->watch_mutex);
4724                 down_write(&rbd_dev->lock_rwsem);
4725                 wake_lock_waiters(rbd_dev, ret);
4726                 up_write(&rbd_dev->lock_rwsem);
4727                 return;
4728         }
4729
4730         rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
4731         rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
4732         mutex_unlock(&rbd_dev->watch_mutex);
4733
4734         down_write(&rbd_dev->lock_rwsem);
4735         if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
4736                 rbd_reacquire_lock(rbd_dev);
4737         up_write(&rbd_dev->lock_rwsem);
4738
4739         ret = rbd_dev_refresh(rbd_dev);
4740         if (ret)
4741                 rbd_warn(rbd_dev, "reregistration refresh failed: %d", ret);
4742 }
4743
4744 /*
4745  * Synchronous osd object method call.  Returns the number of bytes
4746  * returned in the outbound buffer, or a negative error code.
4747  */
4748 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
4749                              struct ceph_object_id *oid,
4750                              struct ceph_object_locator *oloc,
4751                              const char *method_name,
4752                              const void *outbound,
4753                              size_t outbound_size,
4754                              void *inbound,
4755                              size_t inbound_size)
4756 {
4757         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4758         struct page *req_page = NULL;
4759         struct page *reply_page;
4760         int ret;
4761
4762         /*
4763          * Method calls are ultimately read operations.  The result
4764          * should placed into the inbound buffer provided.  They
4765          * also supply outbound data--parameters for the object
4766          * method.  Currently if this is present it will be a
4767          * snapshot id.
4768          */
4769         if (outbound) {
4770                 if (outbound_size > PAGE_SIZE)
4771                         return -E2BIG;
4772
4773                 req_page = alloc_page(GFP_KERNEL);
4774                 if (!req_page)
4775                         return -ENOMEM;
4776
4777                 memcpy(page_address(req_page), outbound, outbound_size);
4778         }
4779
4780         reply_page = alloc_page(GFP_KERNEL);
4781         if (!reply_page) {
4782                 if (req_page)
4783                         __free_page(req_page);
4784                 return -ENOMEM;
4785         }
4786
4787         ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
4788                              CEPH_OSD_FLAG_READ, req_page, outbound_size,
4789                              &reply_page, &inbound_size);
4790         if (!ret) {
4791                 memcpy(inbound, page_address(reply_page), inbound_size);
4792                 ret = inbound_size;
4793         }
4794
4795         if (req_page)
4796                 __free_page(req_page);
4797         __free_page(reply_page);
4798         return ret;
4799 }
4800
4801 static void rbd_queue_workfn(struct work_struct *work)
4802 {
4803         struct request *rq = blk_mq_rq_from_pdu(work);
4804         struct rbd_device *rbd_dev = rq->q->queuedata;
4805         struct rbd_img_request *img_request;
4806         struct ceph_snap_context *snapc = NULL;
4807         u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4808         u64 length = blk_rq_bytes(rq);
4809         enum obj_operation_type op_type;
4810         u64 mapping_size;
4811         int result;
4812
4813         switch (req_op(rq)) {
4814         case REQ_OP_DISCARD:
4815                 op_type = OBJ_OP_DISCARD;
4816                 break;
4817         case REQ_OP_WRITE_ZEROES:
4818                 op_type = OBJ_OP_ZEROOUT;
4819                 break;
4820         case REQ_OP_WRITE:
4821                 op_type = OBJ_OP_WRITE;
4822                 break;
4823         case REQ_OP_READ:
4824                 op_type = OBJ_OP_READ;
4825                 break;
4826         default:
4827                 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4828                 result = -EIO;
4829                 goto err;
4830         }
4831
4832         /* Ignore/skip any zero-length requests */
4833
4834         if (!length) {
4835                 dout("%s: zero-length request\n", __func__);
4836                 result = 0;
4837                 goto err_rq;
4838         }
4839
4840         if (op_type != OBJ_OP_READ) {
4841                 if (rbd_is_ro(rbd_dev)) {
4842                         rbd_warn(rbd_dev, "%s on read-only mapping",
4843                                  obj_op_name(op_type));
4844                         result = -EIO;
4845                         goto err;
4846                 }
4847                 rbd_assert(!rbd_is_snap(rbd_dev));
4848         }
4849
4850         /*
4851          * Quit early if the mapped snapshot no longer exists.  It's
4852          * still possible the snapshot will have disappeared by the
4853          * time our request arrives at the osd, but there's no sense in
4854          * sending it if we already know.
4855          */
4856         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4857                 dout("request for non-existent snapshot");
4858                 rbd_assert(rbd_is_snap(rbd_dev));
4859                 result = -ENXIO;
4860                 goto err_rq;
4861         }
4862
4863         if (offset && length > U64_MAX - offset + 1) {
4864                 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4865                          length);
4866                 result = -EINVAL;
4867                 goto err_rq;    /* Shouldn't happen */
4868         }
4869
4870         blk_mq_start_request(rq);
4871
4872         down_read(&rbd_dev->header_rwsem);
4873         mapping_size = rbd_dev->mapping.size;
4874         if (op_type != OBJ_OP_READ) {
4875                 snapc = rbd_dev->header.snapc;
4876                 ceph_get_snap_context(snapc);
4877         }
4878         up_read(&rbd_dev->header_rwsem);
4879
4880         if (offset + length > mapping_size) {
4881                 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4882                          length, mapping_size);
4883                 result = -EIO;
4884                 goto err_rq;
4885         }
4886
4887         img_request = rbd_img_request_create(rbd_dev, op_type, snapc);
4888         if (!img_request) {
4889                 result = -ENOMEM;
4890                 goto err_rq;
4891         }
4892         img_request->rq = rq;
4893         snapc = NULL; /* img_request consumes a ref */
4894
4895         dout("%s rbd_dev %p img_req %p %s %llu~%llu\n", __func__, rbd_dev,
4896              img_request, obj_op_name(op_type), offset, length);
4897
4898         if (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_ZEROOUT)
4899                 result = rbd_img_fill_nodata(img_request, offset, length);
4900         else
4901                 result = rbd_img_fill_from_bio(img_request, offset, length,
4902                                                rq->bio);
4903         if (result)
4904                 goto err_img_request;
4905
4906         rbd_img_handle_request(img_request, 0);
4907         return;
4908
4909 err_img_request:
4910         rbd_img_request_put(img_request);
4911 err_rq:
4912         if (result)
4913                 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
4914                          obj_op_name(op_type), length, offset, result);
4915         ceph_put_snap_context(snapc);
4916 err:
4917         blk_mq_end_request(rq, errno_to_blk_status(result));
4918 }
4919
4920 static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
4921                 const struct blk_mq_queue_data *bd)
4922 {
4923         struct request *rq = bd->rq;
4924         struct work_struct *work = blk_mq_rq_to_pdu(rq);
4925
4926         queue_work(rbd_wq, work);
4927         return BLK_STS_OK;
4928 }
4929
4930 static void rbd_free_disk(struct rbd_device *rbd_dev)
4931 {
4932         blk_cleanup_queue(rbd_dev->disk->queue);
4933         blk_mq_free_tag_set(&rbd_dev->tag_set);
4934         put_disk(rbd_dev->disk);
4935         rbd_dev->disk = NULL;
4936 }
4937
4938 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
4939                              struct ceph_object_id *oid,
4940                              struct ceph_object_locator *oloc,
4941                              void *buf, int buf_len)
4942
4943 {
4944         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4945         struct ceph_osd_request *req;
4946         struct page **pages;
4947         int num_pages = calc_pages_for(0, buf_len);
4948         int ret;
4949
4950         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4951         if (!req)
4952                 return -ENOMEM;
4953
4954         ceph_oid_copy(&req->r_base_oid, oid);
4955         ceph_oloc_copy(&req->r_base_oloc, oloc);
4956         req->r_flags = CEPH_OSD_FLAG_READ;
4957
4958         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4959         if (IS_ERR(pages)) {
4960                 ret = PTR_ERR(pages);
4961                 goto out_req;
4962         }
4963
4964         osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4965         osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4966                                          true);
4967
4968         ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
4969         if (ret)
4970                 goto out_req;
4971
4972         ceph_osdc_start_request(osdc, req, false);
4973         ret = ceph_osdc_wait_request(osdc, req);
4974         if (ret >= 0)
4975                 ceph_copy_from_page_vector(pages, buf, 0, ret);
4976
4977 out_req:
4978         ceph_osdc_put_request(req);
4979         return ret;
4980 }
4981
4982 /*
4983  * Read the complete header for the given rbd device.  On successful
4984  * return, the rbd_dev->header field will contain up-to-date
4985  * information about the image.
4986  */
4987 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
4988 {
4989         struct rbd_image_header_ondisk *ondisk = NULL;
4990         u32 snap_count = 0;
4991         u64 names_size = 0;
4992         u32 want_count;
4993         int ret;
4994
4995         /*
4996          * The complete header will include an array of its 64-bit
4997          * snapshot ids, followed by the names of those snapshots as
4998          * a contiguous block of NUL-terminated strings.  Note that
4999          * the number of snapshots could change by the time we read
5000          * it in, in which case we re-read it.
5001          */
5002         do {
5003                 size_t size;
5004
5005                 kfree(ondisk);
5006
5007                 size = sizeof (*ondisk);
5008                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
5009                 size += names_size;
5010                 ondisk = kmalloc(size, GFP_KERNEL);
5011                 if (!ondisk)
5012                         return -ENOMEM;
5013
5014                 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
5015                                         &rbd_dev->header_oloc, ondisk, size);
5016                 if (ret < 0)
5017                         goto out;
5018                 if ((size_t)ret < size) {
5019                         ret = -ENXIO;
5020                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
5021                                 size, ret);
5022                         goto out;
5023                 }
5024                 if (!rbd_dev_ondisk_valid(ondisk)) {
5025                         ret = -ENXIO;
5026                         rbd_warn(rbd_dev, "invalid header");
5027                         goto out;
5028                 }
5029
5030                 names_size = le64_to_cpu(ondisk->snap_names_len);
5031                 want_count = snap_count;
5032                 snap_count = le32_to_cpu(ondisk->snap_count);
5033         } while (snap_count != want_count);
5034
5035         ret = rbd_header_from_disk(rbd_dev, ondisk);
5036 out:
5037         kfree(ondisk);
5038
5039         return ret;
5040 }
5041
5042 /*
5043  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
5044  * has disappeared from the (just updated) snapshot context.
5045  */
5046 static void rbd_exists_validate(struct rbd_device *rbd_dev)
5047 {
5048         u64 snap_id;
5049
5050         if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
5051                 return;
5052
5053         snap_id = rbd_dev->spec->snap_id;
5054         if (snap_id == CEPH_NOSNAP)
5055                 return;
5056
5057         if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
5058                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5059 }
5060
5061 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
5062 {
5063         sector_t size;
5064
5065         /*
5066          * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
5067          * try to update its size.  If REMOVING is set, updating size
5068          * is just useless work since the device can't be opened.
5069          */
5070         if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
5071             !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
5072                 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
5073                 dout("setting size to %llu sectors", (unsigned long long)size);
5074                 set_capacity(rbd_dev->disk, size);
5075                 revalidate_disk(rbd_dev->disk);
5076         }
5077 }
5078
5079 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
5080 {
5081         u64 mapping_size;
5082         int ret;
5083
5084         down_write(&rbd_dev->header_rwsem);
5085         mapping_size = rbd_dev->mapping.size;
5086
5087         ret = rbd_dev_header_info(rbd_dev);
5088         if (ret)
5089                 goto out;
5090
5091         /*
5092          * If there is a parent, see if it has disappeared due to the
5093          * mapped image getting flattened.
5094          */
5095         if (rbd_dev->parent) {
5096                 ret = rbd_dev_v2_parent_info(rbd_dev);
5097                 if (ret)
5098                         goto out;
5099         }
5100
5101         if (!rbd_is_snap(rbd_dev)) {
5102                 rbd_dev->mapping.size = rbd_dev->header.image_size;
5103         } else {
5104                 /* validate mapped snapshot's EXISTS flag */
5105                 rbd_exists_validate(rbd_dev);
5106         }
5107
5108 out:
5109         up_write(&rbd_dev->header_rwsem);
5110         if (!ret && mapping_size != rbd_dev->mapping.size)
5111                 rbd_dev_update_size(rbd_dev);
5112
5113         return ret;
5114 }
5115
5116 static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
5117                 unsigned int hctx_idx, unsigned int numa_node)
5118 {
5119         struct work_struct *work = blk_mq_rq_to_pdu(rq);
5120
5121         INIT_WORK(work, rbd_queue_workfn);
5122         return 0;
5123 }
5124
5125 static const struct blk_mq_ops rbd_mq_ops = {
5126         .queue_rq       = rbd_queue_rq,
5127         .init_request   = rbd_init_request,
5128 };
5129
5130 static int rbd_init_disk(struct rbd_device *rbd_dev)
5131 {
5132         struct gendisk *disk;
5133         struct request_queue *q;
5134         unsigned int objset_bytes =
5135             rbd_dev->layout.object_size * rbd_dev->layout.stripe_count;
5136         int err;
5137
5138         /* create gendisk info */
5139         disk = alloc_disk(single_major ?
5140                           (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
5141                           RBD_MINORS_PER_MAJOR);
5142         if (!disk)
5143                 return -ENOMEM;
5144
5145         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
5146                  rbd_dev->dev_id);
5147         disk->major = rbd_dev->major;
5148         disk->first_minor = rbd_dev->minor;
5149         if (single_major)
5150                 disk->flags |= GENHD_FL_EXT_DEVT;
5151         disk->fops = &rbd_bd_ops;
5152         disk->private_data = rbd_dev;
5153
5154         memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
5155         rbd_dev->tag_set.ops = &rbd_mq_ops;
5156         rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
5157         rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
5158         rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE;
5159         rbd_dev->tag_set.nr_hw_queues = 1;
5160         rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
5161
5162         err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
5163         if (err)
5164                 goto out_disk;
5165
5166         q = blk_mq_init_queue(&rbd_dev->tag_set);
5167         if (IS_ERR(q)) {
5168                 err = PTR_ERR(q);
5169                 goto out_tag_set;
5170         }
5171
5172         blk_queue_flag_set(QUEUE_FLAG_NONROT, q);
5173         /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
5174
5175         blk_queue_max_hw_sectors(q, objset_bytes >> SECTOR_SHIFT);
5176         q->limits.max_sectors = queue_max_hw_sectors(q);
5177         blk_queue_max_segments(q, USHRT_MAX);
5178         blk_queue_max_segment_size(q, UINT_MAX);
5179         blk_queue_io_min(q, rbd_dev->opts->alloc_size);
5180         blk_queue_io_opt(q, rbd_dev->opts->alloc_size);
5181
5182         if (rbd_dev->opts->trim) {
5183                 blk_queue_flag_set(QUEUE_FLAG_DISCARD, q);
5184                 q->limits.discard_granularity = rbd_dev->opts->alloc_size;
5185                 blk_queue_max_discard_sectors(q, objset_bytes >> SECTOR_SHIFT);
5186                 blk_queue_max_write_zeroes_sectors(q, objset_bytes >> SECTOR_SHIFT);
5187         }
5188
5189         if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
5190                 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
5191
5192         /*
5193          * disk_release() expects a queue ref from add_disk() and will
5194          * put it.  Hold an extra ref until add_disk() is called.
5195          */
5196         WARN_ON(!blk_get_queue(q));
5197         disk->queue = q;
5198         q->queuedata = rbd_dev;
5199
5200         rbd_dev->disk = disk;
5201
5202         return 0;
5203 out_tag_set:
5204         blk_mq_free_tag_set(&rbd_dev->tag_set);
5205 out_disk:
5206         put_disk(disk);
5207         return err;
5208 }
5209
5210 /*
5211   sysfs
5212 */
5213
5214 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
5215 {
5216         return container_of(dev, struct rbd_device, dev);
5217 }
5218
5219 static ssize_t rbd_size_show(struct device *dev,
5220                              struct device_attribute *attr, char *buf)
5221 {
5222         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5223
5224         return sprintf(buf, "%llu\n",
5225                 (unsigned long long)rbd_dev->mapping.size);
5226 }
5227
5228 /*
5229  * Note this shows the features for whatever's mapped, which is not
5230  * necessarily the base image.
5231  */
5232 static ssize_t rbd_features_show(struct device *dev,
5233                              struct device_attribute *attr, char *buf)
5234 {
5235         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5236
5237         return sprintf(buf, "0x%016llx\n",
5238                         (unsigned long long)rbd_dev->mapping.features);
5239 }
5240
5241 static ssize_t rbd_major_show(struct device *dev,
5242                               struct device_attribute *attr, char *buf)
5243 {
5244         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5245
5246         if (rbd_dev->major)
5247                 return sprintf(buf, "%d\n", rbd_dev->major);
5248
5249         return sprintf(buf, "(none)\n");
5250 }
5251
5252 static ssize_t rbd_minor_show(struct device *dev,
5253                               struct device_attribute *attr, char *buf)
5254 {
5255         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5256
5257         return sprintf(buf, "%d\n", rbd_dev->minor);
5258 }
5259
5260 static ssize_t rbd_client_addr_show(struct device *dev,
5261                                     struct device_attribute *attr, char *buf)
5262 {
5263         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5264         struct ceph_entity_addr *client_addr =
5265             ceph_client_addr(rbd_dev->rbd_client->client);
5266
5267         return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
5268                        le32_to_cpu(client_addr->nonce));
5269 }
5270
5271 static ssize_t rbd_client_id_show(struct device *dev,
5272                                   struct device_attribute *attr, char *buf)
5273 {
5274         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5275
5276         return sprintf(buf, "client%lld\n",
5277                        ceph_client_gid(rbd_dev->rbd_client->client));
5278 }
5279
5280 static ssize_t rbd_cluster_fsid_show(struct device *dev,
5281                                      struct device_attribute *attr, char *buf)
5282 {
5283         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5284
5285         return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
5286 }
5287
5288 static ssize_t rbd_config_info_show(struct device *dev,
5289                                     struct device_attribute *attr, char *buf)
5290 {
5291         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5292
5293         return sprintf(buf, "%s\n", rbd_dev->config_info);
5294 }
5295
5296 static ssize_t rbd_pool_show(struct device *dev,
5297                              struct device_attribute *attr, char *buf)
5298 {
5299         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5300
5301         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
5302 }
5303
5304 static ssize_t rbd_pool_id_show(struct device *dev,
5305                              struct device_attribute *attr, char *buf)
5306 {
5307         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5308
5309         return sprintf(buf, "%llu\n",
5310                         (unsigned long long) rbd_dev->spec->pool_id);
5311 }
5312
5313 static ssize_t rbd_pool_ns_show(struct device *dev,
5314                                 struct device_attribute *attr, char *buf)
5315 {
5316         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5317
5318         return sprintf(buf, "%s\n", rbd_dev->spec->pool_ns ?: "");
5319 }
5320
5321 static ssize_t rbd_name_show(struct device *dev,
5322                              struct device_attribute *attr, char *buf)
5323 {
5324         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5325
5326         if (rbd_dev->spec->image_name)
5327                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
5328
5329         return sprintf(buf, "(unknown)\n");
5330 }
5331
5332 static ssize_t rbd_image_id_show(struct device *dev,
5333                              struct device_attribute *attr, char *buf)
5334 {
5335         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5336
5337         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
5338 }
5339
5340 /*
5341  * Shows the name of the currently-mapped snapshot (or
5342  * RBD_SNAP_HEAD_NAME for the base image).
5343  */
5344 static ssize_t rbd_snap_show(struct device *dev,
5345                              struct device_attribute *attr,
5346                              char *buf)
5347 {
5348         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5349
5350         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
5351 }
5352
5353 static ssize_t rbd_snap_id_show(struct device *dev,
5354                                 struct device_attribute *attr, char *buf)
5355 {
5356         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5357
5358         return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
5359 }
5360
5361 /*
5362  * For a v2 image, shows the chain of parent images, separated by empty
5363  * lines.  For v1 images or if there is no parent, shows "(no parent
5364  * image)".
5365  */
5366 static ssize_t rbd_parent_show(struct device *dev,
5367                                struct device_attribute *attr,
5368                                char *buf)
5369 {
5370         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5371         ssize_t count = 0;
5372
5373         if (!rbd_dev->parent)
5374                 return sprintf(buf, "(no parent image)\n");
5375
5376         for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
5377                 struct rbd_spec *spec = rbd_dev->parent_spec;
5378
5379                 count += sprintf(&buf[count], "%s"
5380                             "pool_id %llu\npool_name %s\n"
5381                             "pool_ns %s\n"
5382                             "image_id %s\nimage_name %s\n"
5383                             "snap_id %llu\nsnap_name %s\n"
5384                             "overlap %llu\n",
5385                             !count ? "" : "\n", /* first? */
5386                             spec->pool_id, spec->pool_name,
5387                             spec->pool_ns ?: "",
5388                             spec->image_id, spec->image_name ?: "(unknown)",
5389                             spec->snap_id, spec->snap_name,
5390                             rbd_dev->parent_overlap);
5391         }
5392
5393         return count;
5394 }
5395
5396 static ssize_t rbd_image_refresh(struct device *dev,
5397                                  struct device_attribute *attr,
5398                                  const char *buf,
5399                                  size_t size)
5400 {
5401         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5402         int ret;
5403
5404         ret = rbd_dev_refresh(rbd_dev);
5405         if (ret)
5406                 return ret;
5407
5408         return size;
5409 }
5410
5411 static DEVICE_ATTR(size, 0444, rbd_size_show, NULL);
5412 static DEVICE_ATTR(features, 0444, rbd_features_show, NULL);
5413 static DEVICE_ATTR(major, 0444, rbd_major_show, NULL);
5414 static DEVICE_ATTR(minor, 0444, rbd_minor_show, NULL);
5415 static DEVICE_ATTR(client_addr, 0444, rbd_client_addr_show, NULL);
5416 static DEVICE_ATTR(client_id, 0444, rbd_client_id_show, NULL);
5417 static DEVICE_ATTR(cluster_fsid, 0444, rbd_cluster_fsid_show, NULL);
5418 static DEVICE_ATTR(config_info, 0400, rbd_config_info_show, NULL);
5419 static DEVICE_ATTR(pool, 0444, rbd_pool_show, NULL);
5420 static DEVICE_ATTR(pool_id, 0444, rbd_pool_id_show, NULL);
5421 static DEVICE_ATTR(pool_ns, 0444, rbd_pool_ns_show, NULL);
5422 static DEVICE_ATTR(name, 0444, rbd_name_show, NULL);
5423 static DEVICE_ATTR(image_id, 0444, rbd_image_id_show, NULL);
5424 static DEVICE_ATTR(refresh, 0200, NULL, rbd_image_refresh);
5425 static DEVICE_ATTR(current_snap, 0444, rbd_snap_show, NULL);
5426 static DEVICE_ATTR(snap_id, 0444, rbd_snap_id_show, NULL);
5427 static DEVICE_ATTR(parent, 0444, rbd_parent_show, NULL);
5428
5429 static struct attribute *rbd_attrs[] = {
5430         &dev_attr_size.attr,
5431         &dev_attr_features.attr,
5432         &dev_attr_major.attr,
5433         &dev_attr_minor.attr,
5434         &dev_attr_client_addr.attr,
5435         &dev_attr_client_id.attr,
5436         &dev_attr_cluster_fsid.attr,
5437         &dev_attr_config_info.attr,
5438         &dev_attr_pool.attr,
5439         &dev_attr_pool_id.attr,
5440         &dev_attr_pool_ns.attr,
5441         &dev_attr_name.attr,
5442         &dev_attr_image_id.attr,
5443         &dev_attr_current_snap.attr,
5444         &dev_attr_snap_id.attr,
5445         &dev_attr_parent.attr,
5446         &dev_attr_refresh.attr,
5447         NULL
5448 };
5449
5450 static struct attribute_group rbd_attr_group = {
5451         .attrs = rbd_attrs,
5452 };
5453
5454 static const struct attribute_group *rbd_attr_groups[] = {
5455         &rbd_attr_group,
5456         NULL
5457 };
5458
5459 static void rbd_dev_release(struct device *dev);
5460
5461 static const struct device_type rbd_device_type = {
5462         .name           = "rbd",
5463         .groups         = rbd_attr_groups,
5464         .release        = rbd_dev_release,
5465 };
5466
5467 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
5468 {
5469         kref_get(&spec->kref);
5470
5471         return spec;
5472 }
5473
5474 static void rbd_spec_free(struct kref *kref);
5475 static void rbd_spec_put(struct rbd_spec *spec)
5476 {
5477         if (spec)
5478                 kref_put(&spec->kref, rbd_spec_free);
5479 }
5480
5481 static struct rbd_spec *rbd_spec_alloc(void)
5482 {
5483         struct rbd_spec *spec;
5484
5485         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
5486         if (!spec)
5487                 return NULL;
5488
5489         spec->pool_id = CEPH_NOPOOL;
5490         spec->snap_id = CEPH_NOSNAP;
5491         kref_init(&spec->kref);
5492
5493         return spec;
5494 }
5495
5496 static void rbd_spec_free(struct kref *kref)
5497 {
5498         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
5499
5500         kfree(spec->pool_name);
5501         kfree(spec->pool_ns);
5502         kfree(spec->image_id);
5503         kfree(spec->image_name);
5504         kfree(spec->snap_name);
5505         kfree(spec);
5506 }
5507
5508 static void rbd_dev_free(struct rbd_device *rbd_dev)
5509 {
5510         WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
5511         WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
5512
5513         ceph_oid_destroy(&rbd_dev->header_oid);
5514         ceph_oloc_destroy(&rbd_dev->header_oloc);
5515         kfree(rbd_dev->config_info);
5516
5517         rbd_put_client(rbd_dev->rbd_client);
5518         rbd_spec_put(rbd_dev->spec);
5519         kfree(rbd_dev->opts);
5520         kfree(rbd_dev);
5521 }
5522
5523 static void rbd_dev_release(struct device *dev)
5524 {
5525         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5526         bool need_put = !!rbd_dev->opts;
5527
5528         if (need_put) {
5529                 destroy_workqueue(rbd_dev->task_wq);
5530                 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5531         }
5532
5533         rbd_dev_free(rbd_dev);
5534
5535         /*
5536          * This is racy, but way better than putting module outside of
5537          * the release callback.  The race window is pretty small, so
5538          * doing something similar to dm (dm-builtin.c) is overkill.
5539          */
5540         if (need_put)
5541                 module_put(THIS_MODULE);
5542 }
5543
5544 static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
5545                                            struct rbd_spec *spec)
5546 {
5547         struct rbd_device *rbd_dev;
5548
5549         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
5550         if (!rbd_dev)
5551                 return NULL;
5552
5553         spin_lock_init(&rbd_dev->lock);
5554         INIT_LIST_HEAD(&rbd_dev->node);
5555         init_rwsem(&rbd_dev->header_rwsem);
5556
5557         rbd_dev->header.data_pool_id = CEPH_NOPOOL;
5558         ceph_oid_init(&rbd_dev->header_oid);
5559         rbd_dev->header_oloc.pool = spec->pool_id;
5560         if (spec->pool_ns) {
5561                 WARN_ON(!*spec->pool_ns);
5562                 rbd_dev->header_oloc.pool_ns =
5563                     ceph_find_or_create_string(spec->pool_ns,
5564                                                strlen(spec->pool_ns));
5565         }
5566
5567         mutex_init(&rbd_dev->watch_mutex);
5568         rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
5569         INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
5570
5571         init_rwsem(&rbd_dev->lock_rwsem);
5572         rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
5573         INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
5574         INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
5575         INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
5576         INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
5577         spin_lock_init(&rbd_dev->lock_lists_lock);
5578         INIT_LIST_HEAD(&rbd_dev->acquiring_list);
5579         INIT_LIST_HEAD(&rbd_dev->running_list);
5580         init_completion(&rbd_dev->acquire_wait);
5581         init_completion(&rbd_dev->releasing_wait);
5582
5583         spin_lock_init(&rbd_dev->object_map_lock);
5584
5585         rbd_dev->dev.bus = &rbd_bus_type;
5586         rbd_dev->dev.type = &rbd_device_type;
5587         rbd_dev->dev.parent = &rbd_root_dev;
5588         device_initialize(&rbd_dev->dev);
5589
5590         rbd_dev->rbd_client = rbdc;
5591         rbd_dev->spec = spec;
5592
5593         return rbd_dev;
5594 }
5595
5596 /*
5597  * Create a mapping rbd_dev.
5598  */
5599 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
5600                                          struct rbd_spec *spec,
5601                                          struct rbd_options *opts)
5602 {
5603         struct rbd_device *rbd_dev;
5604
5605         rbd_dev = __rbd_dev_create(rbdc, spec);
5606         if (!rbd_dev)
5607                 return NULL;
5608
5609         rbd_dev->opts = opts;
5610
5611         /* get an id and fill in device name */
5612         rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
5613                                          minor_to_rbd_dev_id(1 << MINORBITS),
5614                                          GFP_KERNEL);
5615         if (rbd_dev->dev_id < 0)
5616                 goto fail_rbd_dev;
5617
5618         sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
5619         rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
5620                                                    rbd_dev->name);
5621         if (!rbd_dev->task_wq)
5622                 goto fail_dev_id;
5623
5624         /* we have a ref from do_rbd_add() */
5625         __module_get(THIS_MODULE);
5626
5627         dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
5628         return rbd_dev;
5629
5630 fail_dev_id:
5631         ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
5632 fail_rbd_dev:
5633         rbd_dev_free(rbd_dev);
5634         return NULL;
5635 }
5636
5637 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
5638 {
5639         if (rbd_dev)
5640                 put_device(&rbd_dev->dev);
5641 }
5642
5643 /*
5644  * Get the size and object order for an image snapshot, or if
5645  * snap_id is CEPH_NOSNAP, gets this information for the base
5646  * image.
5647  */
5648 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
5649                                 u8 *order, u64 *snap_size)
5650 {
5651         __le64 snapid = cpu_to_le64(snap_id);
5652         int ret;
5653         struct {
5654                 u8 order;
5655                 __le64 size;
5656         } __attribute__ ((packed)) size_buf = { 0 };
5657
5658         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5659                                   &rbd_dev->header_oloc, "get_size",
5660                                   &snapid, sizeof(snapid),
5661                                   &size_buf, sizeof(size_buf));
5662         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5663         if (ret < 0)
5664                 return ret;
5665         if (ret < sizeof (size_buf))
5666                 return -ERANGE;
5667
5668         if (order) {
5669                 *order = size_buf.order;
5670                 dout("  order %u", (unsigned int)*order);
5671         }
5672         *snap_size = le64_to_cpu(size_buf.size);
5673
5674         dout("  snap_id 0x%016llx snap_size = %llu\n",
5675                 (unsigned long long)snap_id,
5676                 (unsigned long long)*snap_size);
5677
5678         return 0;
5679 }
5680
5681 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
5682 {
5683         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
5684                                         &rbd_dev->header.obj_order,
5685                                         &rbd_dev->header.image_size);
5686 }
5687
5688 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
5689 {
5690         size_t size;
5691         void *reply_buf;
5692         int ret;
5693         void *p;
5694
5695         /* Response will be an encoded string, which includes a length */
5696         size = sizeof(__le32) + RBD_OBJ_PREFIX_LEN_MAX;
5697         reply_buf = kzalloc(size, GFP_KERNEL);
5698         if (!reply_buf)
5699                 return -ENOMEM;
5700
5701         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5702                                   &rbd_dev->header_oloc, "get_object_prefix",
5703                                   NULL, 0, reply_buf, size);
5704         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5705         if (ret < 0)
5706                 goto out;
5707
5708         p = reply_buf;
5709         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
5710                                                 p + ret, NULL, GFP_NOIO);
5711         ret = 0;
5712
5713         if (IS_ERR(rbd_dev->header.object_prefix)) {
5714                 ret = PTR_ERR(rbd_dev->header.object_prefix);
5715                 rbd_dev->header.object_prefix = NULL;
5716         } else {
5717                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
5718         }
5719 out:
5720         kfree(reply_buf);
5721
5722         return ret;
5723 }
5724
5725 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
5726                 u64 *snap_features)
5727 {
5728         __le64 snapid = cpu_to_le64(snap_id);
5729         struct {
5730                 __le64 features;
5731                 __le64 incompat;
5732         } __attribute__ ((packed)) features_buf = { 0 };
5733         u64 unsup;
5734         int ret;
5735
5736         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5737                                   &rbd_dev->header_oloc, "get_features",
5738                                   &snapid, sizeof(snapid),
5739                                   &features_buf, sizeof(features_buf));
5740         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5741         if (ret < 0)
5742                 return ret;
5743         if (ret < sizeof (features_buf))
5744                 return -ERANGE;
5745
5746         unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
5747         if (unsup) {
5748                 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
5749                          unsup);
5750                 return -ENXIO;
5751         }
5752
5753         *snap_features = le64_to_cpu(features_buf.features);
5754
5755         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
5756                 (unsigned long long)snap_id,
5757                 (unsigned long long)*snap_features,
5758                 (unsigned long long)le64_to_cpu(features_buf.incompat));
5759
5760         return 0;
5761 }
5762
5763 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
5764 {
5765         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
5766                                                 &rbd_dev->header.features);
5767 }
5768
5769 /*
5770  * These are generic image flags, but since they are used only for
5771  * object map, store them in rbd_dev->object_map_flags.
5772  *
5773  * For the same reason, this function is called only on object map
5774  * (re)load and not on header refresh.
5775  */
5776 static int rbd_dev_v2_get_flags(struct rbd_device *rbd_dev)
5777 {
5778         __le64 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
5779         __le64 flags;
5780         int ret;
5781
5782         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5783                                   &rbd_dev->header_oloc, "get_flags",
5784                                   &snapid, sizeof(snapid),
5785                                   &flags, sizeof(flags));
5786         if (ret < 0)
5787                 return ret;
5788         if (ret < sizeof(flags))
5789                 return -EBADMSG;
5790
5791         rbd_dev->object_map_flags = le64_to_cpu(flags);
5792         return 0;
5793 }
5794
5795 struct parent_image_info {
5796         u64             pool_id;
5797         const char      *pool_ns;
5798         const char      *image_id;
5799         u64             snap_id;
5800
5801         bool            has_overlap;
5802         u64             overlap;
5803 };
5804
5805 /*
5806  * The caller is responsible for @pii.
5807  */
5808 static int decode_parent_image_spec(void **p, void *end,
5809                                     struct parent_image_info *pii)
5810 {
5811         u8 struct_v;
5812         u32 struct_len;
5813         int ret;
5814
5815         ret = ceph_start_decoding(p, end, 1, "ParentImageSpec",
5816                                   &struct_v, &struct_len);
5817         if (ret)
5818                 return ret;
5819
5820         ceph_decode_64_safe(p, end, pii->pool_id, e_inval);
5821         pii->pool_ns = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5822         if (IS_ERR(pii->pool_ns)) {
5823                 ret = PTR_ERR(pii->pool_ns);
5824                 pii->pool_ns = NULL;
5825                 return ret;
5826         }
5827         pii->image_id = ceph_extract_encoded_string(p, end, NULL, GFP_KERNEL);
5828         if (IS_ERR(pii->image_id)) {
5829                 ret = PTR_ERR(pii->image_id);
5830                 pii->image_id = NULL;
5831                 return ret;
5832         }
5833         ceph_decode_64_safe(p, end, pii->snap_id, e_inval);
5834         return 0;
5835
5836 e_inval:
5837         return -EINVAL;
5838 }
5839
5840 static int __get_parent_info(struct rbd_device *rbd_dev,
5841                              struct page *req_page,
5842                              struct page *reply_page,
5843                              struct parent_image_info *pii)
5844 {
5845         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5846         size_t reply_len = PAGE_SIZE;
5847         void *p, *end;
5848         int ret;
5849
5850         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5851                              "rbd", "parent_get", CEPH_OSD_FLAG_READ,
5852                              req_page, sizeof(u64), &reply_page, &reply_len);
5853         if (ret)
5854                 return ret == -EOPNOTSUPP ? 1 : ret;
5855
5856         p = page_address(reply_page);
5857         end = p + reply_len;
5858         ret = decode_parent_image_spec(&p, end, pii);
5859         if (ret)
5860                 return ret;
5861
5862         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5863                              "rbd", "parent_overlap_get", CEPH_OSD_FLAG_READ,
5864                              req_page, sizeof(u64), &reply_page, &reply_len);
5865         if (ret)
5866                 return ret;
5867
5868         p = page_address(reply_page);
5869         end = p + reply_len;
5870         ceph_decode_8_safe(&p, end, pii->has_overlap, e_inval);
5871         if (pii->has_overlap)
5872                 ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5873
5874         return 0;
5875
5876 e_inval:
5877         return -EINVAL;
5878 }
5879
5880 /*
5881  * The caller is responsible for @pii.
5882  */
5883 static int __get_parent_info_legacy(struct rbd_device *rbd_dev,
5884                                     struct page *req_page,
5885                                     struct page *reply_page,
5886                                     struct parent_image_info *pii)
5887 {
5888         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5889         size_t reply_len = PAGE_SIZE;
5890         void *p, *end;
5891         int ret;
5892
5893         ret = ceph_osdc_call(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
5894                              "rbd", "get_parent", CEPH_OSD_FLAG_READ,
5895                              req_page, sizeof(u64), &reply_page, &reply_len);
5896         if (ret)
5897                 return ret;
5898
5899         p = page_address(reply_page);
5900         end = p + reply_len;
5901         ceph_decode_64_safe(&p, end, pii->pool_id, e_inval);
5902         pii->image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
5903         if (IS_ERR(pii->image_id)) {
5904                 ret = PTR_ERR(pii->image_id);
5905                 pii->image_id = NULL;
5906                 return ret;
5907         }
5908         ceph_decode_64_safe(&p, end, pii->snap_id, e_inval);
5909         pii->has_overlap = true;
5910         ceph_decode_64_safe(&p, end, pii->overlap, e_inval);
5911
5912         return 0;
5913
5914 e_inval:
5915         return -EINVAL;
5916 }
5917
5918 static int get_parent_info(struct rbd_device *rbd_dev,
5919                            struct parent_image_info *pii)
5920 {
5921         struct page *req_page, *reply_page;
5922         void *p;
5923         int ret;
5924
5925         req_page = alloc_page(GFP_KERNEL);
5926         if (!req_page)
5927                 return -ENOMEM;
5928
5929         reply_page = alloc_page(GFP_KERNEL);
5930         if (!reply_page) {
5931                 __free_page(req_page);
5932                 return -ENOMEM;
5933         }
5934
5935         p = page_address(req_page);
5936         ceph_encode_64(&p, rbd_dev->spec->snap_id);
5937         ret = __get_parent_info(rbd_dev, req_page, reply_page, pii);
5938         if (ret > 0)
5939                 ret = __get_parent_info_legacy(rbd_dev, req_page, reply_page,
5940                                                pii);
5941
5942         __free_page(req_page);
5943         __free_page(reply_page);
5944         return ret;
5945 }
5946
5947 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
5948 {
5949         struct rbd_spec *parent_spec;
5950         struct parent_image_info pii = { 0 };
5951         int ret;
5952
5953         parent_spec = rbd_spec_alloc();
5954         if (!parent_spec)
5955                 return -ENOMEM;
5956
5957         ret = get_parent_info(rbd_dev, &pii);
5958         if (ret)
5959                 goto out_err;
5960
5961         dout("%s pool_id %llu pool_ns %s image_id %s snap_id %llu has_overlap %d overlap %llu\n",
5962              __func__, pii.pool_id, pii.pool_ns, pii.image_id, pii.snap_id,
5963              pii.has_overlap, pii.overlap);
5964
5965         if (pii.pool_id == CEPH_NOPOOL || !pii.has_overlap) {
5966                 /*
5967                  * Either the parent never existed, or we have
5968                  * record of it but the image got flattened so it no
5969                  * longer has a parent.  When the parent of a
5970                  * layered image disappears we immediately set the
5971                  * overlap to 0.  The effect of this is that all new
5972                  * requests will be treated as if the image had no
5973                  * parent.
5974                  *
5975                  * If !pii.has_overlap, the parent image spec is not
5976                  * applicable.  It's there to avoid duplication in each
5977                  * snapshot record.
5978                  */
5979                 if (rbd_dev->parent_overlap) {
5980                         rbd_dev->parent_overlap = 0;
5981                         rbd_dev_parent_put(rbd_dev);
5982                         pr_info("%s: clone image has been flattened\n",
5983                                 rbd_dev->disk->disk_name);
5984                 }
5985
5986                 goto out;       /* No parent?  No problem. */
5987         }
5988
5989         /* The ceph file layout needs to fit pool id in 32 bits */
5990
5991         ret = -EIO;
5992         if (pii.pool_id > (u64)U32_MAX) {
5993                 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
5994                         (unsigned long long)pii.pool_id, U32_MAX);
5995                 goto out_err;
5996         }
5997
5998         /*
5999          * The parent won't change (except when the clone is
6000          * flattened, already handled that).  So we only need to
6001          * record the parent spec we have not already done so.
6002          */
6003         if (!rbd_dev->parent_spec) {
6004                 parent_spec->pool_id = pii.pool_id;
6005                 if (pii.pool_ns && *pii.pool_ns) {
6006                         parent_spec->pool_ns = pii.pool_ns;
6007                         pii.pool_ns = NULL;
6008                 }
6009                 parent_spec->image_id = pii.image_id;
6010                 pii.image_id = NULL;
6011                 parent_spec->snap_id = pii.snap_id;
6012
6013                 rbd_dev->parent_spec = parent_spec;
6014                 parent_spec = NULL;     /* rbd_dev now owns this */
6015         }
6016
6017         /*
6018          * We always update the parent overlap.  If it's zero we issue
6019          * a warning, as we will proceed as if there was no parent.
6020          */
6021         if (!pii.overlap) {
6022                 if (parent_spec) {
6023                         /* refresh, careful to warn just once */
6024                         if (rbd_dev->parent_overlap)
6025                                 rbd_warn(rbd_dev,
6026                                     "clone now standalone (overlap became 0)");
6027                 } else {
6028                         /* initial probe */
6029                         rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
6030                 }
6031         }
6032         rbd_dev->parent_overlap = pii.overlap;
6033
6034 out:
6035         ret = 0;
6036 out_err:
6037         kfree(pii.pool_ns);
6038         kfree(pii.image_id);
6039         rbd_spec_put(parent_spec);
6040         return ret;
6041 }
6042
6043 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
6044 {
6045         struct {
6046                 __le64 stripe_unit;
6047                 __le64 stripe_count;
6048         } __attribute__ ((packed)) striping_info_buf = { 0 };
6049         size_t size = sizeof (striping_info_buf);
6050         void *p;
6051         int ret;
6052
6053         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6054                                 &rbd_dev->header_oloc, "get_stripe_unit_count",
6055                                 NULL, 0, &striping_info_buf, size);
6056         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6057         if (ret < 0)
6058                 return ret;
6059         if (ret < size)
6060                 return -ERANGE;
6061
6062         p = &striping_info_buf;
6063         rbd_dev->header.stripe_unit = ceph_decode_64(&p);
6064         rbd_dev->header.stripe_count = ceph_decode_64(&p);
6065         return 0;
6066 }
6067
6068 static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
6069 {
6070         __le64 data_pool_id;
6071         int ret;
6072
6073         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6074                                   &rbd_dev->header_oloc, "get_data_pool",
6075                                   NULL, 0, &data_pool_id, sizeof(data_pool_id));
6076         if (ret < 0)
6077                 return ret;
6078         if (ret < sizeof(data_pool_id))
6079                 return -EBADMSG;
6080
6081         rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
6082         WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
6083         return 0;
6084 }
6085
6086 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
6087 {
6088         CEPH_DEFINE_OID_ONSTACK(oid);
6089         size_t image_id_size;
6090         char *image_id;
6091         void *p;
6092         void *end;
6093         size_t size;
6094         void *reply_buf = NULL;
6095         size_t len = 0;
6096         char *image_name = NULL;
6097         int ret;
6098
6099         rbd_assert(!rbd_dev->spec->image_name);
6100
6101         len = strlen(rbd_dev->spec->image_id);
6102         image_id_size = sizeof (__le32) + len;
6103         image_id = kmalloc(image_id_size, GFP_KERNEL);
6104         if (!image_id)
6105                 return NULL;
6106
6107         p = image_id;
6108         end = image_id + image_id_size;
6109         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
6110
6111         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
6112         reply_buf = kmalloc(size, GFP_KERNEL);
6113         if (!reply_buf)
6114                 goto out;
6115
6116         ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
6117         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6118                                   "dir_get_name", image_id, image_id_size,
6119                                   reply_buf, size);
6120         if (ret < 0)
6121                 goto out;
6122         p = reply_buf;
6123         end = reply_buf + ret;
6124
6125         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
6126         if (IS_ERR(image_name))
6127                 image_name = NULL;
6128         else
6129                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
6130 out:
6131         kfree(reply_buf);
6132         kfree(image_id);
6133
6134         return image_name;
6135 }
6136
6137 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6138 {
6139         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6140         const char *snap_name;
6141         u32 which = 0;
6142
6143         /* Skip over names until we find the one we are looking for */
6144
6145         snap_name = rbd_dev->header.snap_names;
6146         while (which < snapc->num_snaps) {
6147                 if (!strcmp(name, snap_name))
6148                         return snapc->snaps[which];
6149                 snap_name += strlen(snap_name) + 1;
6150                 which++;
6151         }
6152         return CEPH_NOSNAP;
6153 }
6154
6155 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6156 {
6157         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
6158         u32 which;
6159         bool found = false;
6160         u64 snap_id;
6161
6162         for (which = 0; !found && which < snapc->num_snaps; which++) {
6163                 const char *snap_name;
6164
6165                 snap_id = snapc->snaps[which];
6166                 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6167                 if (IS_ERR(snap_name)) {
6168                         /* ignore no-longer existing snapshots */
6169                         if (PTR_ERR(snap_name) == -ENOENT)
6170                                 continue;
6171                         else
6172                                 break;
6173                 }
6174                 found = !strcmp(name, snap_name);
6175                 kfree(snap_name);
6176         }
6177         return found ? snap_id : CEPH_NOSNAP;
6178 }
6179
6180 /*
6181  * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
6182  * no snapshot by that name is found, or if an error occurs.
6183  */
6184 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
6185 {
6186         if (rbd_dev->image_format == 1)
6187                 return rbd_v1_snap_id_by_name(rbd_dev, name);
6188
6189         return rbd_v2_snap_id_by_name(rbd_dev, name);
6190 }
6191
6192 /*
6193  * An image being mapped will have everything but the snap id.
6194  */
6195 static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
6196 {
6197         struct rbd_spec *spec = rbd_dev->spec;
6198
6199         rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
6200         rbd_assert(spec->image_id && spec->image_name);
6201         rbd_assert(spec->snap_name);
6202
6203         if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
6204                 u64 snap_id;
6205
6206                 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
6207                 if (snap_id == CEPH_NOSNAP)
6208                         return -ENOENT;
6209
6210                 spec->snap_id = snap_id;
6211         } else {
6212                 spec->snap_id = CEPH_NOSNAP;
6213         }
6214
6215         return 0;
6216 }
6217
6218 /*
6219  * A parent image will have all ids but none of the names.
6220  *
6221  * All names in an rbd spec are dynamically allocated.  It's OK if we
6222  * can't figure out the name for an image id.
6223  */
6224 static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
6225 {
6226         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
6227         struct rbd_spec *spec = rbd_dev->spec;
6228         const char *pool_name;
6229         const char *image_name;
6230         const char *snap_name;
6231         int ret;
6232
6233         rbd_assert(spec->pool_id != CEPH_NOPOOL);
6234         rbd_assert(spec->image_id);
6235         rbd_assert(spec->snap_id != CEPH_NOSNAP);
6236
6237         /* Get the pool name; we have to make our own copy of this */
6238
6239         pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
6240         if (!pool_name) {
6241                 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
6242                 return -EIO;
6243         }
6244         pool_name = kstrdup(pool_name, GFP_KERNEL);
6245         if (!pool_name)
6246                 return -ENOMEM;
6247
6248         /* Fetch the image name; tolerate failure here */
6249
6250         image_name = rbd_dev_image_name(rbd_dev);
6251         if (!image_name)
6252                 rbd_warn(rbd_dev, "unable to get image name");
6253
6254         /* Fetch the snapshot name */
6255
6256         snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
6257         if (IS_ERR(snap_name)) {
6258                 ret = PTR_ERR(snap_name);
6259                 goto out_err;
6260         }
6261
6262         spec->pool_name = pool_name;
6263         spec->image_name = image_name;
6264         spec->snap_name = snap_name;
6265
6266         return 0;
6267
6268 out_err:
6269         kfree(image_name);
6270         kfree(pool_name);
6271         return ret;
6272 }
6273
6274 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
6275 {
6276         size_t size;
6277         int ret;
6278         void *reply_buf;
6279         void *p;
6280         void *end;
6281         u64 seq;
6282         u32 snap_count;
6283         struct ceph_snap_context *snapc;
6284         u32 i;
6285
6286         /*
6287          * We'll need room for the seq value (maximum snapshot id),
6288          * snapshot count, and array of that many snapshot ids.
6289          * For now we have a fixed upper limit on the number we're
6290          * prepared to receive.
6291          */
6292         size = sizeof (__le64) + sizeof (__le32) +
6293                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
6294         reply_buf = kzalloc(size, GFP_KERNEL);
6295         if (!reply_buf)
6296                 return -ENOMEM;
6297
6298         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6299                                   &rbd_dev->header_oloc, "get_snapcontext",
6300                                   NULL, 0, reply_buf, size);
6301         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6302         if (ret < 0)
6303                 goto out;
6304
6305         p = reply_buf;
6306         end = reply_buf + ret;
6307         ret = -ERANGE;
6308         ceph_decode_64_safe(&p, end, seq, out);
6309         ceph_decode_32_safe(&p, end, snap_count, out);
6310
6311         /*
6312          * Make sure the reported number of snapshot ids wouldn't go
6313          * beyond the end of our buffer.  But before checking that,
6314          * make sure the computed size of the snapshot context we
6315          * allocate is representable in a size_t.
6316          */
6317         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
6318                                  / sizeof (u64)) {
6319                 ret = -EINVAL;
6320                 goto out;
6321         }
6322         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
6323                 goto out;
6324         ret = 0;
6325
6326         snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6327         if (!snapc) {
6328                 ret = -ENOMEM;
6329                 goto out;
6330         }
6331         snapc->seq = seq;
6332         for (i = 0; i < snap_count; i++)
6333                 snapc->snaps[i] = ceph_decode_64(&p);
6334
6335         ceph_put_snap_context(rbd_dev->header.snapc);
6336         rbd_dev->header.snapc = snapc;
6337
6338         dout("  snap context seq = %llu, snap_count = %u\n",
6339                 (unsigned long long)seq, (unsigned int)snap_count);
6340 out:
6341         kfree(reply_buf);
6342
6343         return ret;
6344 }
6345
6346 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
6347                                         u64 snap_id)
6348 {
6349         size_t size;
6350         void *reply_buf;
6351         __le64 snapid;
6352         int ret;
6353         void *p;
6354         void *end;
6355         char *snap_name;
6356
6357         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
6358         reply_buf = kmalloc(size, GFP_KERNEL);
6359         if (!reply_buf)
6360                 return ERR_PTR(-ENOMEM);
6361
6362         snapid = cpu_to_le64(snap_id);
6363         ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
6364                                   &rbd_dev->header_oloc, "get_snapshot_name",
6365                                   &snapid, sizeof(snapid), reply_buf, size);
6366         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6367         if (ret < 0) {
6368                 snap_name = ERR_PTR(ret);
6369                 goto out;
6370         }
6371
6372         p = reply_buf;
6373         end = reply_buf + ret;
6374         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
6375         if (IS_ERR(snap_name))
6376                 goto out;
6377
6378         dout("  snap_id 0x%016llx snap_name = %s\n",
6379                 (unsigned long long)snap_id, snap_name);
6380 out:
6381         kfree(reply_buf);
6382
6383         return snap_name;
6384 }
6385
6386 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
6387 {
6388         bool first_time = rbd_dev->header.object_prefix == NULL;
6389         int ret;
6390
6391         ret = rbd_dev_v2_image_size(rbd_dev);
6392         if (ret)
6393                 return ret;
6394
6395         if (first_time) {
6396                 ret = rbd_dev_v2_header_onetime(rbd_dev);
6397                 if (ret)
6398                         return ret;
6399         }
6400
6401         ret = rbd_dev_v2_snap_context(rbd_dev);
6402         if (ret && first_time) {
6403                 kfree(rbd_dev->header.object_prefix);
6404                 rbd_dev->header.object_prefix = NULL;
6405         }
6406
6407         return ret;
6408 }
6409
6410 static int rbd_dev_header_info(struct rbd_device *rbd_dev)
6411 {
6412         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6413
6414         if (rbd_dev->image_format == 1)
6415                 return rbd_dev_v1_header_info(rbd_dev);
6416
6417         return rbd_dev_v2_header_info(rbd_dev);
6418 }
6419
6420 /*
6421  * Skips over white space at *buf, and updates *buf to point to the
6422  * first found non-space character (if any). Returns the length of
6423  * the token (string of non-white space characters) found.  Note
6424  * that *buf must be terminated with '\0'.
6425  */
6426 static inline size_t next_token(const char **buf)
6427 {
6428         /*
6429         * These are the characters that produce nonzero for
6430         * isspace() in the "C" and "POSIX" locales.
6431         */
6432         const char *spaces = " \f\n\r\t\v";
6433
6434         *buf += strspn(*buf, spaces);   /* Find start of token */
6435
6436         return strcspn(*buf, spaces);   /* Return token length */
6437 }
6438
6439 /*
6440  * Finds the next token in *buf, dynamically allocates a buffer big
6441  * enough to hold a copy of it, and copies the token into the new
6442  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
6443  * that a duplicate buffer is created even for a zero-length token.
6444  *
6445  * Returns a pointer to the newly-allocated duplicate, or a null
6446  * pointer if memory for the duplicate was not available.  If
6447  * the lenp argument is a non-null pointer, the length of the token
6448  * (not including the '\0') is returned in *lenp.
6449  *
6450  * If successful, the *buf pointer will be updated to point beyond
6451  * the end of the found token.
6452  *
6453  * Note: uses GFP_KERNEL for allocation.
6454  */
6455 static inline char *dup_token(const char **buf, size_t *lenp)
6456 {
6457         char *dup;
6458         size_t len;
6459
6460         len = next_token(buf);
6461         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
6462         if (!dup)
6463                 return NULL;
6464         *(dup + len) = '\0';
6465         *buf += len;
6466
6467         if (lenp)
6468                 *lenp = len;
6469
6470         return dup;
6471 }
6472
6473 /*
6474  * Parse the options provided for an "rbd add" (i.e., rbd image
6475  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
6476  * and the data written is passed here via a NUL-terminated buffer.
6477  * Returns 0 if successful or an error code otherwise.
6478  *
6479  * The information extracted from these options is recorded in
6480  * the other parameters which return dynamically-allocated
6481  * structures:
6482  *  ceph_opts
6483  *      The address of a pointer that will refer to a ceph options
6484  *      structure.  Caller must release the returned pointer using
6485  *      ceph_destroy_options() when it is no longer needed.
6486  *  rbd_opts
6487  *      Address of an rbd options pointer.  Fully initialized by
6488  *      this function; caller must release with kfree().
6489  *  spec
6490  *      Address of an rbd image specification pointer.  Fully
6491  *      initialized by this function based on parsed options.
6492  *      Caller must release with rbd_spec_put().
6493  *
6494  * The options passed take this form:
6495  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
6496  * where:
6497  *  <mon_addrs>
6498  *      A comma-separated list of one or more monitor addresses.
6499  *      A monitor address is an ip address, optionally followed
6500  *      by a port number (separated by a colon).
6501  *        I.e.:  ip1[:port1][,ip2[:port2]...]
6502  *  <options>
6503  *      A comma-separated list of ceph and/or rbd options.
6504  *  <pool_name>
6505  *      The name of the rados pool containing the rbd image.
6506  *  <image_name>
6507  *      The name of the image in that pool to map.
6508  *  <snap_id>
6509  *      An optional snapshot id.  If provided, the mapping will
6510  *      present data from the image at the time that snapshot was
6511  *      created.  The image head is used if no snapshot id is
6512  *      provided.  Snapshot mappings are always read-only.
6513  */
6514 static int rbd_add_parse_args(const char *buf,
6515                                 struct ceph_options **ceph_opts,
6516                                 struct rbd_options **opts,
6517                                 struct rbd_spec **rbd_spec)
6518 {
6519         size_t len;
6520         char *options;
6521         const char *mon_addrs;
6522         char *snap_name;
6523         size_t mon_addrs_size;
6524         struct parse_rbd_opts_ctx pctx = { 0 };
6525         struct ceph_options *copts;
6526         int ret;
6527
6528         /* The first four tokens are required */
6529
6530         len = next_token(&buf);
6531         if (!len) {
6532                 rbd_warn(NULL, "no monitor address(es) provided");
6533                 return -EINVAL;
6534         }
6535         mon_addrs = buf;
6536         mon_addrs_size = len + 1;
6537         buf += len;
6538
6539         ret = -EINVAL;
6540         options = dup_token(&buf, NULL);
6541         if (!options)
6542                 return -ENOMEM;
6543         if (!*options) {
6544                 rbd_warn(NULL, "no options provided");
6545                 goto out_err;
6546         }
6547
6548         pctx.spec = rbd_spec_alloc();
6549         if (!pctx.spec)
6550                 goto out_mem;
6551
6552         pctx.spec->pool_name = dup_token(&buf, NULL);
6553         if (!pctx.spec->pool_name)
6554                 goto out_mem;
6555         if (!*pctx.spec->pool_name) {
6556                 rbd_warn(NULL, "no pool name provided");
6557                 goto out_err;
6558         }
6559
6560         pctx.spec->image_name = dup_token(&buf, NULL);
6561         if (!pctx.spec->image_name)
6562                 goto out_mem;
6563         if (!*pctx.spec->image_name) {
6564                 rbd_warn(NULL, "no image name provided");
6565                 goto out_err;
6566         }
6567
6568         /*
6569          * Snapshot name is optional; default is to use "-"
6570          * (indicating the head/no snapshot).
6571          */
6572         len = next_token(&buf);
6573         if (!len) {
6574                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
6575                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
6576         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
6577                 ret = -ENAMETOOLONG;
6578                 goto out_err;
6579         }
6580         snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
6581         if (!snap_name)
6582                 goto out_mem;
6583         *(snap_name + len) = '\0';
6584         pctx.spec->snap_name = snap_name;
6585
6586         /* Initialize all rbd options to the defaults */
6587
6588         pctx.opts = kzalloc(sizeof(*pctx.opts), GFP_KERNEL);
6589         if (!pctx.opts)
6590                 goto out_mem;
6591
6592         pctx.opts->read_only = RBD_READ_ONLY_DEFAULT;
6593         pctx.opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
6594         pctx.opts->alloc_size = RBD_ALLOC_SIZE_DEFAULT;
6595         pctx.opts->lock_timeout = RBD_LOCK_TIMEOUT_DEFAULT;
6596         pctx.opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
6597         pctx.opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
6598         pctx.opts->trim = RBD_TRIM_DEFAULT;
6599
6600         copts = ceph_parse_options(options, mon_addrs,
6601                                    mon_addrs + mon_addrs_size - 1,
6602                                    parse_rbd_opts_token, &pctx);
6603         if (IS_ERR(copts)) {
6604                 ret = PTR_ERR(copts);
6605                 goto out_err;
6606         }
6607         kfree(options);
6608
6609         *ceph_opts = copts;
6610         *opts = pctx.opts;
6611         *rbd_spec = pctx.spec;
6612
6613         return 0;
6614 out_mem:
6615         ret = -ENOMEM;
6616 out_err:
6617         kfree(pctx.opts);
6618         rbd_spec_put(pctx.spec);
6619         kfree(options);
6620
6621         return ret;
6622 }
6623
6624 static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
6625 {
6626         down_write(&rbd_dev->lock_rwsem);
6627         if (__rbd_is_lock_owner(rbd_dev))
6628                 __rbd_release_lock(rbd_dev);
6629         up_write(&rbd_dev->lock_rwsem);
6630 }
6631
6632 /*
6633  * If the wait is interrupted, an error is returned even if the lock
6634  * was successfully acquired.  rbd_dev_image_unlock() will release it
6635  * if needed.
6636  */
6637 static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
6638 {
6639         long ret;
6640
6641         if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
6642                 if (!rbd_dev->opts->exclusive && !rbd_dev->opts->lock_on_read)
6643                         return 0;
6644
6645                 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
6646                 return -EINVAL;
6647         }
6648
6649         if (rbd_is_snap(rbd_dev))
6650                 return 0;
6651
6652         rbd_assert(!rbd_is_lock_owner(rbd_dev));
6653         queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
6654         ret = wait_for_completion_killable_timeout(&rbd_dev->acquire_wait,
6655                             ceph_timeout_jiffies(rbd_dev->opts->lock_timeout));
6656         if (ret > 0) {
6657                 ret = rbd_dev->acquire_err;
6658         } else {
6659                 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
6660                 if (!ret)
6661                         ret = -ETIMEDOUT;
6662         }
6663
6664         if (ret) {
6665                 rbd_warn(rbd_dev, "failed to acquire exclusive lock: %ld", ret);
6666                 return ret;
6667         }
6668
6669         /*
6670          * The lock may have been released by now, unless automatic lock
6671          * transitions are disabled.
6672          */
6673         rbd_assert(!rbd_dev->opts->exclusive || rbd_is_lock_owner(rbd_dev));
6674         return 0;
6675 }
6676
6677 /*
6678  * An rbd format 2 image has a unique identifier, distinct from the
6679  * name given to it by the user.  Internally, that identifier is
6680  * what's used to specify the names of objects related to the image.
6681  *
6682  * A special "rbd id" object is used to map an rbd image name to its
6683  * id.  If that object doesn't exist, then there is no v2 rbd image
6684  * with the supplied name.
6685  *
6686  * This function will record the given rbd_dev's image_id field if
6687  * it can be determined, and in that case will return 0.  If any
6688  * errors occur a negative errno will be returned and the rbd_dev's
6689  * image_id field will be unchanged (and should be NULL).
6690  */
6691 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
6692 {
6693         int ret;
6694         size_t size;
6695         CEPH_DEFINE_OID_ONSTACK(oid);
6696         void *response;
6697         char *image_id;
6698
6699         /*
6700          * When probing a parent image, the image id is already
6701          * known (and the image name likely is not).  There's no
6702          * need to fetch the image id again in this case.  We
6703          * do still need to set the image format though.
6704          */
6705         if (rbd_dev->spec->image_id) {
6706                 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
6707
6708                 return 0;
6709         }
6710
6711         /*
6712          * First, see if the format 2 image id file exists, and if
6713          * so, get the image's persistent id from it.
6714          */
6715         ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
6716                                rbd_dev->spec->image_name);
6717         if (ret)
6718                 return ret;
6719
6720         dout("rbd id object name is %s\n", oid.name);
6721
6722         /* Response will be an encoded string, which includes a length */
6723         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
6724         response = kzalloc(size, GFP_NOIO);
6725         if (!response) {
6726                 ret = -ENOMEM;
6727                 goto out;
6728         }
6729
6730         /* If it doesn't exist we'll assume it's a format 1 image */
6731
6732         ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
6733                                   "get_id", NULL, 0,
6734                                   response, size);
6735         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
6736         if (ret == -ENOENT) {
6737                 image_id = kstrdup("", GFP_KERNEL);
6738                 ret = image_id ? 0 : -ENOMEM;
6739                 if (!ret)
6740                         rbd_dev->image_format = 1;
6741         } else if (ret >= 0) {
6742                 void *p = response;
6743
6744                 image_id = ceph_extract_encoded_string(&p, p + ret,
6745                                                 NULL, GFP_NOIO);
6746                 ret = PTR_ERR_OR_ZERO(image_id);
6747                 if (!ret)
6748                         rbd_dev->image_format = 2;
6749         }
6750
6751         if (!ret) {
6752                 rbd_dev->spec->image_id = image_id;
6753                 dout("image_id is %s\n", image_id);
6754         }
6755 out:
6756         kfree(response);
6757         ceph_oid_destroy(&oid);
6758         return ret;
6759 }
6760
6761 /*
6762  * Undo whatever state changes are made by v1 or v2 header info
6763  * call.
6764  */
6765 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
6766 {
6767         struct rbd_image_header *header;
6768
6769         rbd_dev_parent_put(rbd_dev);
6770         rbd_object_map_free(rbd_dev);
6771         rbd_dev_mapping_clear(rbd_dev);
6772
6773         /* Free dynamic fields from the header, then zero it out */
6774
6775         header = &rbd_dev->header;
6776         ceph_put_snap_context(header->snapc);
6777         kfree(header->snap_sizes);
6778         kfree(header->snap_names);
6779         kfree(header->object_prefix);
6780         memset(header, 0, sizeof (*header));
6781 }
6782
6783 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
6784 {
6785         int ret;
6786
6787         ret = rbd_dev_v2_object_prefix(rbd_dev);
6788         if (ret)
6789                 goto out_err;
6790
6791         /*
6792          * Get the and check features for the image.  Currently the
6793          * features are assumed to never change.
6794          */
6795         ret = rbd_dev_v2_features(rbd_dev);
6796         if (ret)
6797                 goto out_err;
6798
6799         /* If the image supports fancy striping, get its parameters */
6800
6801         if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
6802                 ret = rbd_dev_v2_striping_info(rbd_dev);
6803                 if (ret < 0)
6804                         goto out_err;
6805         }
6806
6807         if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
6808                 ret = rbd_dev_v2_data_pool(rbd_dev);
6809                 if (ret)
6810                         goto out_err;
6811         }
6812
6813         rbd_init_layout(rbd_dev);
6814         return 0;
6815
6816 out_err:
6817         rbd_dev->header.features = 0;
6818         kfree(rbd_dev->header.object_prefix);
6819         rbd_dev->header.object_prefix = NULL;
6820         return ret;
6821 }
6822
6823 /*
6824  * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
6825  * rbd_dev_image_probe() recursion depth, which means it's also the
6826  * length of the already discovered part of the parent chain.
6827  */
6828 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
6829 {
6830         struct rbd_device *parent = NULL;
6831         int ret;
6832
6833         if (!rbd_dev->parent_spec)
6834                 return 0;
6835
6836         if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
6837                 pr_info("parent chain is too long (%d)\n", depth);
6838                 ret = -EINVAL;
6839                 goto out_err;
6840         }
6841
6842         parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
6843         if (!parent) {
6844                 ret = -ENOMEM;
6845                 goto out_err;
6846         }
6847
6848         /*
6849          * Images related by parent/child relationships always share
6850          * rbd_client and spec/parent_spec, so bump their refcounts.
6851          */
6852         __rbd_get_client(rbd_dev->rbd_client);
6853         rbd_spec_get(rbd_dev->parent_spec);
6854
6855         __set_bit(RBD_DEV_FLAG_READONLY, &parent->flags);
6856
6857         ret = rbd_dev_image_probe(parent, depth);
6858         if (ret < 0)
6859                 goto out_err;
6860
6861         rbd_dev->parent = parent;
6862         atomic_set(&rbd_dev->parent_ref, 1);
6863         return 0;
6864
6865 out_err:
6866         rbd_dev_unparent(rbd_dev);
6867         rbd_dev_destroy(parent);
6868         return ret;
6869 }
6870
6871 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
6872 {
6873         clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6874         rbd_free_disk(rbd_dev);
6875         if (!single_major)
6876                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6877 }
6878
6879 /*
6880  * rbd_dev->header_rwsem must be locked for write and will be unlocked
6881  * upon return.
6882  */
6883 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
6884 {
6885         int ret;
6886
6887         /* Record our major and minor device numbers. */
6888
6889         if (!single_major) {
6890                 ret = register_blkdev(0, rbd_dev->name);
6891                 if (ret < 0)
6892                         goto err_out_unlock;
6893
6894                 rbd_dev->major = ret;
6895                 rbd_dev->minor = 0;
6896         } else {
6897                 rbd_dev->major = rbd_major;
6898                 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
6899         }
6900
6901         /* Set up the blkdev mapping. */
6902
6903         ret = rbd_init_disk(rbd_dev);
6904         if (ret)
6905                 goto err_out_blkdev;
6906
6907         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
6908         set_disk_ro(rbd_dev->disk, rbd_is_ro(rbd_dev));
6909
6910         ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
6911         if (ret)
6912                 goto err_out_disk;
6913
6914         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6915         up_write(&rbd_dev->header_rwsem);
6916         return 0;
6917
6918 err_out_disk:
6919         rbd_free_disk(rbd_dev);
6920 err_out_blkdev:
6921         if (!single_major)
6922                 unregister_blkdev(rbd_dev->major, rbd_dev->name);
6923 err_out_unlock:
6924         up_write(&rbd_dev->header_rwsem);
6925         return ret;
6926 }
6927
6928 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6929 {
6930         struct rbd_spec *spec = rbd_dev->spec;
6931         int ret;
6932
6933         /* Record the header object name for this rbd image. */
6934
6935         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
6936         if (rbd_dev->image_format == 1)
6937                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6938                                        spec->image_name, RBD_SUFFIX);
6939         else
6940                 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6941                                        RBD_HEADER_PREFIX, spec->image_id);
6942
6943         return ret;
6944 }
6945
6946 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6947 {
6948         rbd_dev_unprobe(rbd_dev);
6949         if (rbd_dev->opts)
6950                 rbd_unregister_watch(rbd_dev);
6951         rbd_dev->image_format = 0;
6952         kfree(rbd_dev->spec->image_id);
6953         rbd_dev->spec->image_id = NULL;
6954 }
6955
6956 /*
6957  * Probe for the existence of the header object for the given rbd
6958  * device.  If this image is the one being mapped (i.e., not a
6959  * parent), initiate a watch on its header object before using that
6960  * object to get detailed information about the rbd image.
6961  */
6962 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
6963 {
6964         int ret;
6965
6966         /*
6967          * Get the id from the image id object.  Unless there's an
6968          * error, rbd_dev->spec->image_id will be filled in with
6969          * a dynamically-allocated string, and rbd_dev->image_format
6970          * will be set to either 1 or 2.
6971          */
6972         ret = rbd_dev_image_id(rbd_dev);
6973         if (ret)
6974                 return ret;
6975
6976         ret = rbd_dev_header_name(rbd_dev);
6977         if (ret)
6978                 goto err_out_format;
6979
6980         if (!depth) {
6981                 ret = rbd_register_watch(rbd_dev);
6982                 if (ret) {
6983                         if (ret == -ENOENT)
6984                                 pr_info("image %s/%s%s%s does not exist\n",
6985                                         rbd_dev->spec->pool_name,
6986                                         rbd_dev->spec->pool_ns ?: "",
6987                                         rbd_dev->spec->pool_ns ? "/" : "",
6988                                         rbd_dev->spec->image_name);
6989                         goto err_out_format;
6990                 }
6991         }
6992
6993         ret = rbd_dev_header_info(rbd_dev);
6994         if (ret)
6995                 goto err_out_watch;
6996
6997         /*
6998          * If this image is the one being mapped, we have pool name and
6999          * id, image name and id, and snap name - need to fill snap id.
7000          * Otherwise this is a parent image, identified by pool, image
7001          * and snap ids - need to fill in names for those ids.
7002          */
7003         if (!depth)
7004                 ret = rbd_spec_fill_snap_id(rbd_dev);
7005         else
7006                 ret = rbd_spec_fill_names(rbd_dev);
7007         if (ret) {
7008                 if (ret == -ENOENT)
7009                         pr_info("snap %s/%s%s%s@%s does not exist\n",
7010                                 rbd_dev->spec->pool_name,
7011                                 rbd_dev->spec->pool_ns ?: "",
7012                                 rbd_dev->spec->pool_ns ? "/" : "",
7013                                 rbd_dev->spec->image_name,
7014                                 rbd_dev->spec->snap_name);
7015                 goto err_out_probe;
7016         }
7017
7018         ret = rbd_dev_mapping_set(rbd_dev);
7019         if (ret)
7020                 goto err_out_probe;
7021
7022         if (rbd_is_snap(rbd_dev) &&
7023             (rbd_dev->header.features & RBD_FEATURE_OBJECT_MAP)) {
7024                 ret = rbd_object_map_load(rbd_dev);
7025                 if (ret)
7026                         goto err_out_probe;
7027         }
7028
7029         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
7030                 ret = rbd_dev_v2_parent_info(rbd_dev);
7031                 if (ret)
7032                         goto err_out_probe;
7033         }
7034
7035         ret = rbd_dev_probe_parent(rbd_dev, depth);
7036         if (ret)
7037                 goto err_out_probe;
7038
7039         dout("discovered format %u image, header name is %s\n",
7040                 rbd_dev->image_format, rbd_dev->header_oid.name);
7041         return 0;
7042
7043 err_out_probe:
7044         rbd_dev_unprobe(rbd_dev);
7045 err_out_watch:
7046         if (!depth)
7047                 rbd_unregister_watch(rbd_dev);
7048 err_out_format:
7049         rbd_dev->image_format = 0;
7050         kfree(rbd_dev->spec->image_id);
7051         rbd_dev->spec->image_id = NULL;
7052         return ret;
7053 }
7054
7055 static ssize_t do_rbd_add(struct bus_type *bus,
7056                           const char *buf,
7057                           size_t count)
7058 {
7059         struct rbd_device *rbd_dev = NULL;
7060         struct ceph_options *ceph_opts = NULL;
7061         struct rbd_options *rbd_opts = NULL;
7062         struct rbd_spec *spec = NULL;
7063         struct rbd_client *rbdc;
7064         int rc;
7065
7066         if (!try_module_get(THIS_MODULE))
7067                 return -ENODEV;
7068
7069         /* parse add command */
7070         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
7071         if (rc < 0)
7072                 goto out;
7073
7074         rbdc = rbd_get_client(ceph_opts);
7075         if (IS_ERR(rbdc)) {
7076                 rc = PTR_ERR(rbdc);
7077                 goto err_out_args;
7078         }
7079
7080         /* pick the pool */
7081         rc = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, spec->pool_name);
7082         if (rc < 0) {
7083                 if (rc == -ENOENT)
7084                         pr_info("pool %s does not exist\n", spec->pool_name);
7085                 goto err_out_client;
7086         }
7087         spec->pool_id = (u64)rc;
7088
7089         rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
7090         if (!rbd_dev) {
7091                 rc = -ENOMEM;
7092                 goto err_out_client;
7093         }
7094         rbdc = NULL;            /* rbd_dev now owns this */
7095         spec = NULL;            /* rbd_dev now owns this */
7096         rbd_opts = NULL;        /* rbd_dev now owns this */
7097
7098         /* if we are mapping a snapshot it will be a read-only mapping */
7099         if (rbd_dev->opts->read_only ||
7100             strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME))
7101                 __set_bit(RBD_DEV_FLAG_READONLY, &rbd_dev->flags);
7102
7103         rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
7104         if (!rbd_dev->config_info) {
7105                 rc = -ENOMEM;
7106                 goto err_out_rbd_dev;
7107         }
7108
7109         down_write(&rbd_dev->header_rwsem);
7110         rc = rbd_dev_image_probe(rbd_dev, 0);
7111         if (rc < 0) {
7112                 up_write(&rbd_dev->header_rwsem);
7113                 goto err_out_rbd_dev;
7114         }
7115
7116         if (rbd_dev->opts->alloc_size > rbd_dev->layout.object_size) {
7117                 rbd_warn(rbd_dev, "alloc_size adjusted to %u",
7118                          rbd_dev->layout.object_size);
7119                 rbd_dev->opts->alloc_size = rbd_dev->layout.object_size;
7120         }
7121
7122         rc = rbd_dev_device_setup(rbd_dev);
7123         if (rc)
7124                 goto err_out_image_probe;
7125
7126         rc = rbd_add_acquire_lock(rbd_dev);
7127         if (rc)
7128                 goto err_out_image_lock;
7129
7130         /* Everything's ready.  Announce the disk to the world. */
7131
7132         rc = device_add(&rbd_dev->dev);
7133         if (rc)
7134                 goto err_out_image_lock;
7135
7136         add_disk(rbd_dev->disk);
7137         /* see rbd_init_disk() */
7138         blk_put_queue(rbd_dev->disk->queue);
7139
7140         spin_lock(&rbd_dev_list_lock);
7141         list_add_tail(&rbd_dev->node, &rbd_dev_list);
7142         spin_unlock(&rbd_dev_list_lock);
7143
7144         pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
7145                 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
7146                 rbd_dev->header.features);
7147         rc = count;
7148 out:
7149         module_put(THIS_MODULE);
7150         return rc;
7151
7152 err_out_image_lock:
7153         rbd_dev_image_unlock(rbd_dev);
7154         rbd_dev_device_release(rbd_dev);
7155 err_out_image_probe:
7156         rbd_dev_image_release(rbd_dev);
7157 err_out_rbd_dev:
7158         rbd_dev_destroy(rbd_dev);
7159 err_out_client:
7160         rbd_put_client(rbdc);
7161 err_out_args:
7162         rbd_spec_put(spec);
7163         kfree(rbd_opts);
7164         goto out;
7165 }
7166
7167 static ssize_t add_store(struct bus_type *bus, const char *buf, size_t count)
7168 {
7169         if (single_major)
7170                 return -EINVAL;
7171
7172         return do_rbd_add(bus, buf, count);
7173 }
7174
7175 static ssize_t add_single_major_store(struct bus_type *bus, const char *buf,
7176                                       size_t count)
7177 {
7178         return do_rbd_add(bus, buf, count);
7179 }
7180
7181 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
7182 {
7183         while (rbd_dev->parent) {
7184                 struct rbd_device *first = rbd_dev;
7185                 struct rbd_device *second = first->parent;
7186                 struct rbd_device *third;
7187
7188                 /*
7189                  * Follow to the parent with no grandparent and
7190                  * remove it.
7191                  */
7192                 while (second && (third = second->parent)) {
7193                         first = second;
7194                         second = third;
7195                 }
7196                 rbd_assert(second);
7197                 rbd_dev_image_release(second);
7198                 rbd_dev_destroy(second);
7199                 first->parent = NULL;
7200                 first->parent_overlap = 0;
7201
7202                 rbd_assert(first->parent_spec);
7203                 rbd_spec_put(first->parent_spec);
7204                 first->parent_spec = NULL;
7205         }
7206 }
7207
7208 static ssize_t do_rbd_remove(struct bus_type *bus,
7209                              const char *buf,
7210                              size_t count)
7211 {
7212         struct rbd_device *rbd_dev = NULL;
7213         struct list_head *tmp;
7214         int dev_id;
7215         char opt_buf[6];
7216         bool force = false;
7217         int ret;
7218
7219         dev_id = -1;
7220         opt_buf[0] = '\0';
7221         sscanf(buf, "%d %5s", &dev_id, opt_buf);
7222         if (dev_id < 0) {
7223                 pr_err("dev_id out of range\n");
7224                 return -EINVAL;
7225         }
7226         if (opt_buf[0] != '\0') {
7227                 if (!strcmp(opt_buf, "force")) {
7228                         force = true;
7229                 } else {
7230                         pr_err("bad remove option at '%s'\n", opt_buf);
7231                         return -EINVAL;
7232                 }
7233         }
7234
7235         ret = -ENOENT;
7236         spin_lock(&rbd_dev_list_lock);
7237         list_for_each(tmp, &rbd_dev_list) {
7238                 rbd_dev = list_entry(tmp, struct rbd_device, node);
7239                 if (rbd_dev->dev_id == dev_id) {
7240                         ret = 0;
7241                         break;
7242                 }
7243         }
7244         if (!ret) {
7245                 spin_lock_irq(&rbd_dev->lock);
7246                 if (rbd_dev->open_count && !force)
7247                         ret = -EBUSY;
7248                 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
7249                                           &rbd_dev->flags))
7250                         ret = -EINPROGRESS;
7251                 spin_unlock_irq(&rbd_dev->lock);
7252         }
7253         spin_unlock(&rbd_dev_list_lock);
7254         if (ret)
7255                 return ret;
7256
7257         if (force) {
7258                 /*
7259                  * Prevent new IO from being queued and wait for existing
7260                  * IO to complete/fail.
7261                  */
7262                 blk_mq_freeze_queue(rbd_dev->disk->queue);
7263                 blk_set_queue_dying(rbd_dev->disk->queue);
7264         }
7265
7266         del_gendisk(rbd_dev->disk);
7267         spin_lock(&rbd_dev_list_lock);
7268         list_del_init(&rbd_dev->node);
7269         spin_unlock(&rbd_dev_list_lock);
7270         device_del(&rbd_dev->dev);
7271
7272         rbd_dev_image_unlock(rbd_dev);
7273         rbd_dev_device_release(rbd_dev);
7274         rbd_dev_image_release(rbd_dev);
7275         rbd_dev_destroy(rbd_dev);
7276         return count;
7277 }
7278
7279 static ssize_t remove_store(struct bus_type *bus, const char *buf, size_t count)
7280 {
7281         if (single_major)
7282                 return -EINVAL;
7283
7284         return do_rbd_remove(bus, buf, count);
7285 }
7286
7287 static ssize_t remove_single_major_store(struct bus_type *bus, const char *buf,
7288                                          size_t count)
7289 {
7290         return do_rbd_remove(bus, buf, count);
7291 }
7292
7293 /*
7294  * create control files in sysfs
7295  * /sys/bus/rbd/...
7296  */
7297 static int __init rbd_sysfs_init(void)
7298 {
7299         int ret;
7300
7301         ret = device_register(&rbd_root_dev);
7302         if (ret < 0)
7303                 return ret;
7304
7305         ret = bus_register(&rbd_bus_type);
7306         if (ret < 0)
7307                 device_unregister(&rbd_root_dev);
7308
7309         return ret;
7310 }
7311
7312 static void __exit rbd_sysfs_cleanup(void)
7313 {
7314         bus_unregister(&rbd_bus_type);
7315         device_unregister(&rbd_root_dev);
7316 }
7317
7318 static int __init rbd_slab_init(void)
7319 {
7320         rbd_assert(!rbd_img_request_cache);
7321         rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
7322         if (!rbd_img_request_cache)
7323                 return -ENOMEM;
7324
7325         rbd_assert(!rbd_obj_request_cache);
7326         rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
7327         if (!rbd_obj_request_cache)
7328                 goto out_err;
7329
7330         return 0;
7331
7332 out_err:
7333         kmem_cache_destroy(rbd_img_request_cache);
7334         rbd_img_request_cache = NULL;
7335         return -ENOMEM;
7336 }
7337
7338 static void rbd_slab_exit(void)
7339 {
7340         rbd_assert(rbd_obj_request_cache);
7341         kmem_cache_destroy(rbd_obj_request_cache);
7342         rbd_obj_request_cache = NULL;
7343
7344         rbd_assert(rbd_img_request_cache);
7345         kmem_cache_destroy(rbd_img_request_cache);
7346         rbd_img_request_cache = NULL;
7347 }
7348
7349 static int __init rbd_init(void)
7350 {
7351         int rc;
7352
7353         if (!libceph_compatible(NULL)) {
7354                 rbd_warn(NULL, "libceph incompatibility (quitting)");
7355                 return -EINVAL;
7356         }
7357
7358         rc = rbd_slab_init();
7359         if (rc)
7360                 return rc;
7361
7362         /*
7363          * The number of active work items is limited by the number of
7364          * rbd devices * queue depth, so leave @max_active at default.
7365          */
7366         rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
7367         if (!rbd_wq) {
7368                 rc = -ENOMEM;
7369                 goto err_out_slab;
7370         }
7371
7372         if (single_major) {
7373                 rbd_major = register_blkdev(0, RBD_DRV_NAME);
7374                 if (rbd_major < 0) {
7375                         rc = rbd_major;
7376                         goto err_out_wq;
7377                 }
7378         }
7379
7380         rc = rbd_sysfs_init();
7381         if (rc)
7382                 goto err_out_blkdev;
7383
7384         if (single_major)
7385                 pr_info("loaded (major %d)\n", rbd_major);
7386         else
7387                 pr_info("loaded\n");
7388
7389         return 0;
7390
7391 err_out_blkdev:
7392         if (single_major)
7393                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7394 err_out_wq:
7395         destroy_workqueue(rbd_wq);
7396 err_out_slab:
7397         rbd_slab_exit();
7398         return rc;
7399 }
7400
7401 static void __exit rbd_exit(void)
7402 {
7403         ida_destroy(&rbd_dev_id_ida);
7404         rbd_sysfs_cleanup();
7405         if (single_major)
7406                 unregister_blkdev(rbd_major, RBD_DRV_NAME);
7407         destroy_workqueue(rbd_wq);
7408         rbd_slab_exit();
7409 }
7410
7411 module_init(rbd_init);
7412 module_exit(rbd_exit);
7413
7414 MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
7415 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
7416 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
7417 /* following authorship retained from original osdblk.c */
7418 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
7419
7420 MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
7421 MODULE_LICENSE("GPL");