Merge tag 'for-5.18/dm-changes' of git://git.kernel.org/pub/scm/linux/kernel/git...
[linux-2.6-microblaze.git] / drivers / md / dm.c
index 997ace4..021b8ff 100644 (file)
 #define DM_COOKIE_ENV_VAR_NAME "DM_COOKIE"
 #define DM_COOKIE_LENGTH 24
 
+/*
+ * For REQ_POLLED fs bio, this flag is set if we link mapped underlying
+ * dm_io into one list, and reuse bio->bi_private as the list head. Before
+ * ending this fs bio, we will recover its ->bi_private.
+ */
+#define REQ_DM_POLL_LIST       REQ_DRV
+
 static const char *_name = DM_NAME;
 
 static unsigned int major = 0;
@@ -73,16 +80,21 @@ struct clone_info {
        struct dm_io *io;
        sector_t sector;
        unsigned sector_count;
+       bool submit_as_polled;
 };
 
 #define DM_TARGET_IO_BIO_OFFSET (offsetof(struct dm_target_io, clone))
 #define DM_IO_BIO_OFFSET \
        (offsetof(struct dm_target_io, clone) + offsetof(struct dm_io, tio))
 
+static inline struct dm_target_io *clone_to_tio(struct bio *clone)
+{
+       return container_of(clone, struct dm_target_io, clone);
+}
+
 void *dm_per_bio_data(struct bio *bio, size_t data_size)
 {
-       struct dm_target_io *tio = container_of(bio, struct dm_target_io, clone);
-       if (!tio->inside_dm_io)
+       if (!dm_tio_flagged(clone_to_tio(bio), DM_TIO_INSIDE_DM_IO))
                return (char *)bio - DM_TARGET_IO_BIO_OFFSET - data_size;
        return (char *)bio - DM_IO_BIO_OFFSET - data_size;
 }
@@ -477,40 +489,78 @@ out:
 
 u64 dm_start_time_ns_from_clone(struct bio *bio)
 {
-       struct dm_target_io *tio = container_of(bio, struct dm_target_io, clone);
-       struct dm_io *io = tio->io;
-
-       return jiffies_to_nsecs(io->start_time);
+       return jiffies_to_nsecs(clone_to_tio(bio)->io->start_time);
 }
 EXPORT_SYMBOL_GPL(dm_start_time_ns_from_clone);
 
-static void start_io_acct(struct dm_io *io)
+static bool bio_is_flush_with_data(struct bio *bio)
 {
-       struct mapped_device *md = io->md;
-       struct bio *bio = io->orig_bio;
+       return ((bio->bi_opf & REQ_PREFLUSH) && bio->bi_iter.bi_size);
+}
+
+static void dm_io_acct(bool end, struct mapped_device *md, struct bio *bio,
+                      unsigned long start_time, struct dm_stats_aux *stats_aux)
+{
+       bool is_flush_with_data;
+       unsigned int bi_size;
+
+       /* If REQ_PREFLUSH set save any payload but do not account it */
+       is_flush_with_data = bio_is_flush_with_data(bio);
+       if (is_flush_with_data) {
+               bi_size = bio->bi_iter.bi_size;
+               bio->bi_iter.bi_size = 0;
+       }
+
+       if (!end)
+               bio_start_io_acct_time(bio, start_time);
+       else
+               bio_end_io_acct(bio, start_time);
 
-       bio_start_io_acct_time(bio, io->start_time);
        if (unlikely(dm_stats_used(&md->stats)))
                dm_stats_account_io(&md->stats, bio_data_dir(bio),
                                    bio->bi_iter.bi_sector, bio_sectors(bio),
-                                   false, 0, &io->stats_aux);
+                                   end, start_time, stats_aux);
+
+       /* Restore bio's payload so it does get accounted upon requeue */
+       if (is_flush_with_data)
+               bio->bi_iter.bi_size = bi_size;
 }
 
-static void end_io_acct(struct mapped_device *md, struct bio *bio,
-                       unsigned long start_time, struct dm_stats_aux *stats_aux)
+static void __dm_start_io_acct(struct dm_io *io, struct bio *bio)
 {
-       unsigned long duration = jiffies - start_time;
+       dm_io_acct(false, io->md, bio, io->start_time, &io->stats_aux);
+}
 
-       bio_end_io_acct(bio, start_time);
+static void dm_start_io_acct(struct dm_io *io, struct bio *clone)
+{
+       /* Must account IO to DM device in terms of orig_bio */
+       struct bio *bio = io->orig_bio;
 
-       if (unlikely(dm_stats_used(&md->stats)))
-               dm_stats_account_io(&md->stats, bio_data_dir(bio),
-                                   bio->bi_iter.bi_sector, bio_sectors(bio),
-                                   true, duration, stats_aux);
+       /*
+        * Ensure IO accounting is only ever started once.
+        * Expect no possibility for race unless DM_TIO_IS_DUPLICATE_BIO.
+        */
+       if (!clone ||
+           likely(!dm_tio_flagged(clone_to_tio(clone), DM_TIO_IS_DUPLICATE_BIO))) {
+               if (WARN_ON_ONCE(dm_io_flagged(io, DM_IO_ACCOUNTED)))
+                       return;
+               dm_io_set_flag(io, DM_IO_ACCOUNTED);
+       } else {
+               unsigned long flags;
+               if (dm_io_flagged(io, DM_IO_ACCOUNTED))
+                       return;
+               /* Can afford locking given DM_TIO_IS_DUPLICATE_BIO */
+               spin_lock_irqsave(&io->lock, flags);
+               dm_io_set_flag(io, DM_IO_ACCOUNTED);
+               spin_unlock_irqrestore(&io->lock, flags);
+       }
 
-       /* nudge anyone waiting on suspend queue */
-       if (unlikely(wq_has_sleeper(&md->wait)))
-               wake_up(&md->wait);
+       __dm_start_io_acct(io, bio);
+}
+
+static void dm_end_io_acct(struct dm_io *io, struct bio *bio)
+{
+       dm_io_acct(true, io->md, bio, io->start_time, &io->stats_aux);
 }
 
 static struct dm_io *alloc_io(struct mapped_device *md, struct bio *bio)
@@ -519,62 +569,80 @@ static struct dm_io *alloc_io(struct mapped_device *md, struct bio *bio)
        struct dm_target_io *tio;
        struct bio *clone;
 
-       clone = bio_alloc_bioset(GFP_NOIO, 0, &md->io_bs);
-       if (!clone)
-               return NULL;
+       clone = bio_alloc_clone(bio->bi_bdev, bio, GFP_NOIO, &md->io_bs);
 
-       tio = container_of(clone, struct dm_target_io, clone);
-       tio->inside_dm_io = true;
+       tio = clone_to_tio(clone);
+       tio->flags = 0;
+       dm_tio_set_flag(tio, DM_TIO_INSIDE_DM_IO);
        tio->io = NULL;
 
        io = container_of(tio, struct dm_io, tio);
        io->magic = DM_IO_MAGIC;
        io->status = 0;
        atomic_set(&io->io_count, 1);
-       io->orig_bio = bio;
+       this_cpu_inc(*md->pending_io);
+       io->orig_bio = NULL;
        io->md = md;
-       spin_lock_init(&io->endio_lock);
-
+       io->map_task = current;
+       spin_lock_init(&io->lock);
        io->start_time = jiffies;
+       io->flags = 0;
+
+       dm_stats_record_start(&md->stats, &io->stats_aux);
 
        return io;
 }
 
-static void free_io(struct mapped_device *md, struct dm_io *io)
+static void free_io(struct dm_io *io)
 {
        bio_put(&io->tio.clone);
 }
 
-static struct dm_target_io *alloc_tio(struct clone_info *ci, struct dm_target *ti,
-                                     unsigned target_bio_nr, gfp_t gfp_mask)
+static struct bio *alloc_tio(struct clone_info *ci, struct dm_target *ti,
+               unsigned target_bio_nr, unsigned *len, gfp_t gfp_mask)
 {
        struct dm_target_io *tio;
+       struct bio *clone;
 
        if (!ci->io->tio.io) {
                /* the dm_target_io embedded in ci->io is available */
                tio = &ci->io->tio;
+               /* alloc_io() already initialized embedded clone */
+               clone = &tio->clone;
        } else {
-               struct bio *clone = bio_alloc_bioset(gfp_mask, 0, &ci->io->md->bs);
+               clone = bio_alloc_clone(ci->bio->bi_bdev, ci->bio,
+                                       gfp_mask, &ci->io->md->bs);
                if (!clone)
                        return NULL;
 
-               tio = container_of(clone, struct dm_target_io, clone);
-               tio->inside_dm_io = false;
+               /* REQ_DM_POLL_LIST shouldn't be inherited */
+               clone->bi_opf &= ~REQ_DM_POLL_LIST;
+
+               tio = clone_to_tio(clone);
+               tio->flags = 0; /* also clears DM_TIO_INSIDE_DM_IO */
        }
 
        tio->magic = DM_TIO_MAGIC;
        tio->io = ci->io;
        tio->ti = ti;
        tio->target_bio_nr = target_bio_nr;
+       tio->len_ptr = len;
+       tio->old_sector = 0;
+
+       if (len) {
+               clone->bi_iter.bi_size = to_bytes(*len);
+               if (bio_integrity(clone))
+                       bio_integrity_trim(clone);
+       }
 
-       return tio;
+       return clone;
 }
 
-static void free_tio(struct dm_target_io *tio)
+static void free_tio(struct bio *clone)
 {
-       if (tio->inside_dm_io)
+       if (dm_tio_flagged(clone_to_tio(clone), DM_TIO_INSIDE_DM_IO))
                return;
-       bio_put(&tio->clone);
+       bio_put(clone);
 }
 
 /*
@@ -779,71 +847,100 @@ static int __noflush_suspending(struct mapped_device *md)
        return test_bit(DMF_NOFLUSH_SUSPENDING, &md->flags);
 }
 
-/*
- * Decrements the number of outstanding ios that a bio has been
- * cloned into, completing the original io if necc.
- */
-void dm_io_dec_pending(struct dm_io *io, blk_status_t error)
+static void dm_io_complete(struct dm_io *io)
 {
-       unsigned long flags;
        blk_status_t io_error;
-       struct bio *bio;
        struct mapped_device *md = io->md;
-       unsigned long start_time = 0;
-       struct dm_stats_aux stats_aux;
-
-       /* Push-back supersedes any I/O errors */
-       if (unlikely(error)) {
-               spin_lock_irqsave(&io->endio_lock, flags);
-               if (!(io->status == BLK_STS_DM_REQUEUE && __noflush_suspending(md)))
-                       io->status = error;
-               spin_unlock_irqrestore(&io->endio_lock, flags);
-       }
+       struct bio *bio = io->orig_bio;
 
-       if (atomic_dec_and_test(&io->io_count)) {
-               bio = io->orig_bio;
-               if (io->status == BLK_STS_DM_REQUEUE) {
+       if (io->status == BLK_STS_DM_REQUEUE) {
+               unsigned long flags;
+               /*
+                * Target requested pushing back the I/O.
+                */
+               spin_lock_irqsave(&md->deferred_lock, flags);
+               if (__noflush_suspending(md) &&
+                   !WARN_ON_ONCE(dm_is_zone_write(md, bio))) {
+                       /* NOTE early return due to BLK_STS_DM_REQUEUE below */
+                       bio_list_add_head(&md->deferred, bio);
+               } else {
                        /*
-                        * Target requested pushing back the I/O.
+                        * noflush suspend was interrupted or this is
+                        * a write to a zoned target.
                         */
-                       spin_lock_irqsave(&md->deferred_lock, flags);
-                       if (__noflush_suspending(md) &&
-                           !WARN_ON_ONCE(dm_is_zone_write(md, bio))) {
-                               /* NOTE early return due to BLK_STS_DM_REQUEUE below */
-                               bio_list_add_head(&md->deferred, bio);
-                       } else {
-                               /*
-                                * noflush suspend was interrupted or this is
-                                * a write to a zoned target.
-                                */
-                               io->status = BLK_STS_IOERR;
-                       }
-                       spin_unlock_irqrestore(&md->deferred_lock, flags);
+                       io->status = BLK_STS_IOERR;
                }
+               spin_unlock_irqrestore(&md->deferred_lock, flags);
+       }
 
-               io_error = io->status;
-               start_time = io->start_time;
-               stats_aux = io->stats_aux;
-               free_io(md, io);
-               end_io_acct(md, bio, start_time, &stats_aux);
+       io_error = io->status;
+       if (dm_io_flagged(io, DM_IO_ACCOUNTED))
+               dm_end_io_acct(io, bio);
+       else if (!io_error) {
+               /*
+                * Must handle target that DM_MAPIO_SUBMITTED only to
+                * then bio_endio() rather than dm_submit_bio_remap()
+                */
+               __dm_start_io_acct(io, bio);
+               dm_end_io_acct(io, bio);
+       }
+       free_io(io);
+       smp_wmb();
+       this_cpu_dec(*md->pending_io);
 
-               if (io_error == BLK_STS_DM_REQUEUE)
-                       return;
+       /* nudge anyone waiting on suspend queue */
+       if (unlikely(wq_has_sleeper(&md->wait)))
+               wake_up(&md->wait);
 
-               if ((bio->bi_opf & REQ_PREFLUSH) && bio->bi_iter.bi_size) {
-                       /*
-                        * Preflush done for flush with data, reissue
-                        * without REQ_PREFLUSH.
-                        */
-                       bio->bi_opf &= ~REQ_PREFLUSH;
-                       queue_io(md, bio);
-               } else {
-                       /* done with normal IO or empty flush */
-                       if (io_error)
-                               bio->bi_status = io_error;
-                       bio_endio(bio);
-               }
+       if (io_error == BLK_STS_DM_REQUEUE) {
+               /*
+                * Upper layer won't help us poll split bio, io->orig_bio
+                * may only reflect a subset of the pre-split original,
+                * so clear REQ_POLLED in case of requeue
+                */
+               bio->bi_opf &= ~REQ_POLLED;
+               return;
+       }
+
+       if (bio_is_flush_with_data(bio)) {
+               /*
+                * Preflush done for flush with data, reissue
+                * without REQ_PREFLUSH.
+                */
+               bio->bi_opf &= ~REQ_PREFLUSH;
+               queue_io(md, bio);
+       } else {
+               /* done with normal IO or empty flush */
+               if (io_error)
+                       bio->bi_status = io_error;
+               bio_endio(bio);
+       }
+}
+
+static inline bool dm_tio_is_normal(struct dm_target_io *tio)
+{
+       return (dm_tio_flagged(tio, DM_TIO_INSIDE_DM_IO) &&
+               !dm_tio_flagged(tio, DM_TIO_IS_DUPLICATE_BIO));
+}
+
+/*
+ * Decrements the number of outstanding ios that a bio has been
+ * cloned into, completing the original io if necc.
+ */
+void dm_io_dec_pending(struct dm_io *io, blk_status_t error)
+{
+       /* Push-back supersedes any I/O errors */
+       if (unlikely(error)) {
+               unsigned long flags;
+               spin_lock_irqsave(&io->lock, flags);
+               if (!(io->status == BLK_STS_DM_REQUEUE &&
+                     __noflush_suspending(io->md)))
+                       io->status = error;
+               spin_unlock_irqrestore(&io->lock, flags);
        }
+
+       if (atomic_dec_and_test(&io->io_count))
+               dm_io_complete(io);
 }
 
 void disable_discard(struct mapped_device *md)
@@ -879,7 +976,7 @@ static bool swap_bios_limit(struct dm_target *ti, struct bio *bio)
 static void clone_endio(struct bio *bio)
 {
        blk_status_t error = bio->bi_status;
-       struct dm_target_io *tio = container_of(bio, struct dm_target_io, clone);
+       struct dm_target_io *tio = clone_to_tio(bio);
        struct dm_io *io = tio->io;
        struct mapped_device *md = tio->io->md;
        dm_endio_fn endio = tio->ti->type->end_io;
@@ -930,7 +1027,7 @@ static void clone_endio(struct bio *bio)
                up(&md->swap_bios_semaphore);
        }
 
-       free_tio(tio);
+       free_tio(bio);
        dm_io_dec_pending(io, error);
 }
 
@@ -1057,7 +1154,8 @@ static int dm_dax_zero_page_range(struct dax_device *dax_dev, pgoff_t pgoff,
 /*
  * A target may call dm_accept_partial_bio only from the map routine.  It is
  * allowed for all bio types except REQ_PREFLUSH, REQ_OP_ZONE_* zone management
- * operations and REQ_OP_ZONE_APPEND (zone append writes).
+ * operations, REQ_OP_ZONE_APPEND (zone append writes) and any bio serviced by
+ * __send_duplicate_bios().
  *
  * dm_accept_partial_bio informs the dm that the target only wants to process
  * additional n_sectors sectors of the bio and the rest of the data should be
@@ -1085,10 +1183,10 @@ static int dm_dax_zero_page_range(struct dax_device *dax_dev, pgoff_t pgoff,
  */
 void dm_accept_partial_bio(struct bio *bio, unsigned n_sectors)
 {
-       struct dm_target_io *tio = container_of(bio, struct dm_target_io, clone);
+       struct dm_target_io *tio = clone_to_tio(bio);
        unsigned bi_size = bio->bi_iter.bi_size >> SECTOR_SHIFT;
 
-       BUG_ON(bio->bi_opf & REQ_PREFLUSH);
+       BUG_ON(dm_tio_flagged(tio, DM_TIO_IS_DUPLICATE_BIO));
        BUG_ON(op_is_zone_mgmt(bio_op(bio)));
        BUG_ON(bio_op(bio) == REQ_OP_ZONE_APPEND);
        BUG_ON(bi_size > *tio->len_ptr);
@@ -1099,6 +1197,56 @@ void dm_accept_partial_bio(struct bio *bio, unsigned n_sectors)
 }
 EXPORT_SYMBOL_GPL(dm_accept_partial_bio);
 
+static inline void __dm_submit_bio_remap(struct bio *clone,
+                                        dev_t dev, sector_t old_sector)
+{
+       trace_block_bio_remap(clone, dev, old_sector);
+       submit_bio_noacct(clone);
+}
+
+/*
+ * @clone: clone bio that DM core passed to target's .map function
+ * @tgt_clone: clone of @clone bio that target needs submitted
+ *
+ * Targets should use this interface to submit bios they take
+ * ownership of when returning DM_MAPIO_SUBMITTED.
+ *
+ * Target should also enable ti->accounts_remapped_io
+ */
+void dm_submit_bio_remap(struct bio *clone, struct bio *tgt_clone)
+{
+       struct dm_target_io *tio = clone_to_tio(clone);
+       struct dm_io *io = tio->io;
+
+       WARN_ON_ONCE(!tio->ti->accounts_remapped_io);
+
+       /* establish bio that will get submitted */
+       if (!tgt_clone)
+               tgt_clone = clone;
+
+       /*
+        * Account io->origin_bio to DM dev on behalf of target
+        * that took ownership of IO with DM_MAPIO_SUBMITTED.
+        */
+       if (io->map_task == current) {
+               /* Still in target's map function */
+               dm_io_set_flag(io, DM_IO_START_ACCT);
+       } else {
+               /*
+                * Called by another thread, managed by DM target,
+                * wait for dm_split_and_process_bio() to store
+                * io->orig_bio
+                */
+               while (unlikely(!smp_load_acquire(&io->orig_bio)))
+                       msleep(1);
+               dm_start_io_acct(io, clone);
+       }
+
+       __dm_submit_bio_remap(tgt_clone, disk_devt(io->md->disk),
+                             tio->old_sector);
+}
+EXPORT_SYMBOL_GPL(dm_submit_bio_remap);
+
 static noinline void __set_swap_bios_limit(struct mapped_device *md, int latch)
 {
        mutex_lock(&md->swap_bios_lock);
@@ -1115,23 +1263,20 @@ static noinline void __set_swap_bios_limit(struct mapped_device *md, int latch)
        mutex_unlock(&md->swap_bios_lock);
 }
 
-static void __map_bio(struct dm_target_io *tio)
+static void __map_bio(struct bio *clone)
 {
+       struct dm_target_io *tio = clone_to_tio(clone);
        int r;
-       sector_t sector;
-       struct bio *clone = &tio->clone;
        struct dm_io *io = tio->io;
        struct dm_target *ti = tio->ti;
 
        clone->bi_end_io = clone_endio;
 
        /*
-        * Map the clone.  If r == 0 we don't need to do
-        * anything, the target has assumed ownership of
-        * this io.
+        * Map the clone.
         */
        dm_io_inc_pending(io);
-       sector = clone->bi_iter.bi_sector;
+       tio->old_sector = clone->bi_iter.bi_sector;
 
        if (unlikely(swap_bios_limit(ti, clone))) {
                struct mapped_device *md = io->md;
@@ -1153,27 +1298,28 @@ static void __map_bio(struct dm_target_io *tio)
 
        switch (r) {
        case DM_MAPIO_SUBMITTED:
+               /* target has assumed ownership of this io */
+               if (!ti->accounts_remapped_io)
+                       dm_io_set_flag(io, DM_IO_START_ACCT);
                break;
        case DM_MAPIO_REMAPPED:
-               /* the bio has been remapped so dispatch it */
-               trace_block_bio_remap(clone, bio_dev(io->orig_bio), sector);
-               submit_bio_noacct(clone);
+               /*
+                * the bio has been remapped so dispatch it, but defer
+                * dm_start_io_acct() until after possible bio_split().
+                */
+               __dm_submit_bio_remap(clone, disk_devt(io->md->disk),
+                                     tio->old_sector);
+               dm_io_set_flag(io, DM_IO_START_ACCT);
                break;
        case DM_MAPIO_KILL:
-               if (unlikely(swap_bios_limit(ti, clone))) {
-                       struct mapped_device *md = io->md;
-                       up(&md->swap_bios_semaphore);
-               }
-               free_tio(tio);
-               dm_io_dec_pending(io, BLK_STS_IOERR);
-               break;
        case DM_MAPIO_REQUEUE:
-               if (unlikely(swap_bios_limit(ti, clone))) {
-                       struct mapped_device *md = io->md;
-                       up(&md->swap_bios_semaphore);
-               }
-               free_tio(tio);
-               dm_io_dec_pending(io, BLK_STS_DM_REQUEUE);
+               if (unlikely(swap_bios_limit(ti, clone)))
+                       up(&io->md->swap_bios_semaphore);
+               free_tio(clone);
+               if (r == DM_MAPIO_KILL)
+                       dm_io_dec_pending(io, BLK_STS_IOERR);
+               else
+                       dm_io_dec_pending(io, BLK_STS_DM_REQUEUE);
                break;
        default:
                DMWARN("unimplemented target map return value: %d", r);
@@ -1181,119 +1327,61 @@ static void __map_bio(struct dm_target_io *tio)
        }
 }
 
-static void bio_setup_sector(struct bio *bio, sector_t sector, unsigned len)
-{
-       bio->bi_iter.bi_sector = sector;
-       bio->bi_iter.bi_size = to_bytes(len);
-}
-
-/*
- * Creates a bio that consists of range of complete bvecs.
- */
-static int clone_bio(struct dm_target_io *tio, struct bio *bio,
-                    sector_t sector, unsigned len)
-{
-       struct bio *clone = &tio->clone;
-       int r;
-
-       __bio_clone_fast(clone, bio);
-
-       r = bio_crypt_clone(clone, bio, GFP_NOIO);
-       if (r < 0)
-               return r;
-
-       if (bio_integrity(bio)) {
-               if (unlikely(!dm_target_has_integrity(tio->ti->type) &&
-                            !dm_target_passes_integrity(tio->ti->type))) {
-                       DMWARN("%s: the target %s doesn't support integrity data.",
-                               dm_device_name(tio->io->md),
-                               tio->ti->type->name);
-                       return -EIO;
-               }
-
-               r = bio_integrity_clone(clone, bio, GFP_NOIO);
-               if (r < 0)
-                       return r;
-       }
-
-       bio_advance(clone, to_bytes(sector - clone->bi_iter.bi_sector));
-       clone->bi_iter.bi_size = to_bytes(len);
-
-       if (bio_integrity(bio))
-               bio_integrity_trim(clone);
-
-       return 0;
-}
-
 static void alloc_multiple_bios(struct bio_list *blist, struct clone_info *ci,
-                               struct dm_target *ti, unsigned num_bios)
+                               struct dm_target *ti, unsigned num_bios,
+                               unsigned *len)
 {
-       struct dm_target_io *tio;
+       struct bio *bio;
        int try;
 
-       if (!num_bios)
-               return;
-
-       if (num_bios == 1) {
-               tio = alloc_tio(ci, ti, 0, GFP_NOIO);
-               bio_list_add(blist, &tio->clone);
-               return;
-       }
-
        for (try = 0; try < 2; try++) {
                int bio_nr;
-               struct bio *bio;
 
                if (try)
                        mutex_lock(&ci->io->md->table_devices_lock);
                for (bio_nr = 0; bio_nr < num_bios; bio_nr++) {
-                       tio = alloc_tio(ci, ti, bio_nr, try ? GFP_NOIO : GFP_NOWAIT);
-                       if (!tio)
+                       bio = alloc_tio(ci, ti, bio_nr, len,
+                                       try ? GFP_NOIO : GFP_NOWAIT);
+                       if (!bio)
                                break;
 
-                       bio_list_add(blist, &tio->clone);
+                       bio_list_add(blist, bio);
                }
                if (try)
                        mutex_unlock(&ci->io->md->table_devices_lock);
                if (bio_nr == num_bios)
                        return;
 
-               while ((bio = bio_list_pop(blist))) {
-                       tio = container_of(bio, struct dm_target_io, clone);
-                       free_tio(tio);
-               }
+               while ((bio = bio_list_pop(blist)))
+                       free_tio(bio);
        }
 }
 
-static void __clone_and_map_simple_bio(struct clone_info *ci,
-                                          struct dm_target_io *tio, unsigned *len)
-{
-       struct bio *clone = &tio->clone;
-
-       tio->len_ptr = len;
-
-       __bio_clone_fast(clone, ci->bio);
-       if (len)
-               bio_setup_sector(clone, ci->sector, *len);
-       __map_bio(tio);
-}
-
 static void __send_duplicate_bios(struct clone_info *ci, struct dm_target *ti,
                                  unsigned num_bios, unsigned *len)
 {
        struct bio_list blist = BIO_EMPTY_LIST;
-       struct bio *bio;
-       struct dm_target_io *tio;
-
-       alloc_multiple_bios(&blist, ci, ti, num_bios);
+       struct bio *clone;
 
-       while ((bio = bio_list_pop(&blist))) {
-               tio = container_of(bio, struct dm_target_io, clone);
-               __clone_and_map_simple_bio(ci, tio, len);
+       switch (num_bios) {
+       case 0:
+               break;
+       case 1:
+               clone = alloc_tio(ci, ti, 0, len, GFP_NOIO);
+               dm_tio_set_flag(clone_to_tio(clone), DM_TIO_IS_DUPLICATE_BIO);
+               __map_bio(clone);
+               break;
+       default:
+               alloc_multiple_bios(&blist, ci, ti, num_bios, len);
+               while ((clone = bio_list_pop(&blist))) {
+                       dm_tio_set_flag(clone_to_tio(clone), DM_TIO_IS_DUPLICATE_BIO);
+                       __map_bio(clone);
+               }
+               break;
        }
 }
 
-static int __send_empty_flush(struct clone_info *ci)
+static void __send_empty_flush(struct clone_info *ci)
 {
        unsigned target_nr = 0;
        struct dm_target *ti;
@@ -1304,63 +1392,34 @@ static int __send_empty_flush(struct clone_info *ci)
         * need to reference it after submit. It's just used as
         * the basis for the clone(s).
         */
-       bio_init(&flush_bio, NULL, 0);
-       flush_bio.bi_opf = REQ_OP_WRITE | REQ_PREFLUSH | REQ_SYNC;
-       bio_set_dev(&flush_bio, ci->io->md->disk->part0);
+       bio_init(&flush_bio, ci->io->md->disk->part0, NULL, 0,
+                REQ_OP_WRITE | REQ_PREFLUSH | REQ_SYNC);
 
        ci->bio = &flush_bio;
        ci->sector_count = 0;
 
-       BUG_ON(bio_has_data(ci->bio));
        while ((ti = dm_table_get_target(ci->map, target_nr++)))
                __send_duplicate_bios(ci, ti, ti->num_flush_bios, NULL);
 
        bio_uninit(ci->bio);
-       return 0;
-}
-
-static int __clone_and_map_data_bio(struct clone_info *ci, struct dm_target *ti,
-                                   sector_t sector, unsigned *len)
-{
-       struct bio *bio = ci->bio;
-       struct dm_target_io *tio;
-       int r;
-
-       tio = alloc_tio(ci, ti, 0, GFP_NOIO);
-       tio->len_ptr = len;
-       r = clone_bio(tio, bio, sector, *len);
-       if (r < 0) {
-               free_tio(tio);
-               return r;
-       }
-       __map_bio(tio);
-
-       return 0;
 }
 
-static int __send_changing_extent_only(struct clone_info *ci, struct dm_target *ti,
-                                      unsigned num_bios)
+static void __send_changing_extent_only(struct clone_info *ci, struct dm_target *ti,
+                                       unsigned num_bios)
 {
        unsigned len;
 
-       /*
-        * Even though the device advertised support for this type of
-        * request, that does not mean every target supports it, and
-        * reconfiguration might also have changed that since the
-        * check was performed.
-        */
-       if (!num_bios)
-               return -EOPNOTSUPP;
-
        len = min_t(sector_t, ci->sector_count,
                    max_io_len_target_boundary(ti, dm_target_offset(ti, ci->sector)));
 
-       __send_duplicate_bios(ci, ti, num_bios, &len);
-
+       /*
+        * dm_accept_partial_bio cannot be used with duplicate bios,
+        * so update clone_info cursor before __send_duplicate_bios().
+        */
        ci->sector += len;
        ci->sector_count -= len;
 
-       return 0;
+       __send_duplicate_bios(ci, ti, num_bios, &len);
 }
 
 static bool is_abnormal_io(struct bio *bio)
@@ -1382,10 +1441,9 @@ static bool is_abnormal_io(struct bio *bio)
 static bool __process_abnormal_io(struct clone_info *ci, struct dm_target *ti,
                                  int *result)
 {
-       struct bio *bio = ci->bio;
        unsigned num_bios = 0;
 
-       switch (bio_op(bio)) {
+       switch (bio_op(ci->bio)) {
        case REQ_OP_DISCARD:
                num_bios = ti->num_discard_bios;
                break;
@@ -1402,15 +1460,68 @@ static bool __process_abnormal_io(struct clone_info *ci, struct dm_target *ti,
                return false;
        }
 
-       *result = __send_changing_extent_only(ci, ti, num_bios);
+       /*
+        * Even though the device advertised support for this type of
+        * request, that does not mean every target supports it, and
+        * reconfiguration might also have changed that since the
+        * check was performed.
+        */
+       if (!num_bios)
+               *result = -EOPNOTSUPP;
+       else {
+               __send_changing_extent_only(ci, ti, num_bios);
+               *result = 0;
+       }
        return true;
 }
 
+/*
+ * Reuse ->bi_private as hlist head for storing all dm_io instances
+ * associated with this bio, and this bio's bi_private needs to be
+ * stored in dm_io->data before the reuse.
+ *
+ * bio->bi_private is owned by fs or upper layer, so block layer won't
+ * touch it after splitting. Meantime it won't be changed by anyone after
+ * bio is submitted. So this reuse is safe.
+ */
+static inline struct hlist_head *dm_get_bio_hlist_head(struct bio *bio)
+{
+       return (struct hlist_head *)&bio->bi_private;
+}
+
+static void dm_queue_poll_io(struct bio *bio, struct dm_io *io)
+{
+       struct hlist_head *head = dm_get_bio_hlist_head(bio);
+
+       if (!(bio->bi_opf & REQ_DM_POLL_LIST)) {
+               bio->bi_opf |= REQ_DM_POLL_LIST;
+               /*
+                * Save .bi_private into dm_io, so that we can reuse
+                * .bi_private as hlist head for storing dm_io list
+                */
+               io->data = bio->bi_private;
+
+               INIT_HLIST_HEAD(head);
+
+               /* tell block layer to poll for completion */
+               bio->bi_cookie = ~BLK_QC_T_NONE;
+       } else {
+               /*
+                * bio recursed due to split, reuse original poll list,
+                * and save bio->bi_private too.
+                */
+               io->data = hlist_entry(head->first, struct dm_io, node)->data;
+       }
+
+       hlist_add_head(&io->node, head);
+}
+
 /*
  * Select the correct strategy for processing a non-flush bio.
  */
-static int __split_and_process_non_flush(struct clone_info *ci)
+static int __split_and_process_bio(struct clone_info *ci)
 {
+       struct bio *clone;
        struct dm_target *ti;
        unsigned len;
        int r;
@@ -1422,11 +1533,15 @@ static int __split_and_process_non_flush(struct clone_info *ci)
        if (__process_abnormal_io(ci, ti, &r))
                return r;
 
-       len = min_t(sector_t, max_io_len(ti, ci->sector), ci->sector_count);
+       /*
+        * Only support bio polling for normal IO, and the target io is
+        * exactly inside the dm_io instance (verified in dm_poll_dm_io)
+        */
+       ci->submit_as_polled = ci->bio->bi_opf & REQ_POLLED;
 
-       r = __clone_and_map_data_bio(ci, ti, ci->sector, &len);
-       if (r < 0)
-               return r;
+       len = min_t(sector_t, max_io_len(ti, ci->sector), ci->sector_count);
+       clone = alloc_tio(ci, ti, 0, &len, GFP_NOIO);
+       __map_bio(clone);
 
        ci->sector += len;
        ci->sector_count -= len;
@@ -1439,53 +1554,69 @@ static void init_clone_info(struct clone_info *ci, struct mapped_device *md,
 {
        ci->map = map;
        ci->io = alloc_io(md, bio);
+       ci->bio = bio;
+       ci->submit_as_polled = false;
        ci->sector = bio->bi_iter.bi_sector;
+       ci->sector_count = bio_sectors(bio);
+
+       /* Shouldn't happen but sector_count was being set to 0 so... */
+       if (WARN_ON_ONCE(op_is_zone_mgmt(bio_op(bio)) && ci->sector_count))
+               ci->sector_count = 0;
 }
 
 /*
  * Entry point to split a bio into clones and submit them to the targets.
  */
-static void __split_and_process_bio(struct mapped_device *md,
-                                       struct dm_table *map, struct bio *bio)
+static void dm_split_and_process_bio(struct mapped_device *md,
+                                    struct dm_table *map, struct bio *bio)
 {
        struct clone_info ci;
+       struct bio *orig_bio = NULL;
        int error = 0;
 
        init_clone_info(&ci, md, map, bio);
 
        if (bio->bi_opf & REQ_PREFLUSH) {
-               error = __send_empty_flush(&ci);
-               /* dm_io_dec_pending submits any data associated with flush */
-       } else if (op_is_zone_mgmt(bio_op(bio))) {
-               ci.bio = bio;
-               ci.sector_count = 0;
-               error = __split_and_process_non_flush(&ci);
-       } else {
-               ci.bio = bio;
-               ci.sector_count = bio_sectors(bio);
-               error = __split_and_process_non_flush(&ci);
-               if (ci.sector_count && !error) {
-                       /*
-                        * Remainder must be passed to submit_bio_noacct()
-                        * so that it gets handled *after* bios already submitted
-                        * have been completely processed.
-                        * We take a clone of the original to store in
-                        * ci.io->orig_bio to be used by end_io_acct() and
-                        * for dec_pending to use for completion handling.
-                        */
-                       struct bio *b = bio_split(bio, bio_sectors(bio) - ci.sector_count,
-                                                 GFP_NOIO, &md->queue->bio_split);
-                       ci.io->orig_bio = b;
-
-                       bio_chain(b, bio);
-                       trace_block_split(b, bio->bi_iter.bi_sector);
-                       submit_bio_noacct(bio);
-               }
+               __send_empty_flush(&ci);
+               /* dm_io_complete submits any data associated with flush */
+               goto out;
        }
-       start_io_acct(ci.io);
 
-       /* drop the extra reference count */
-       dm_io_dec_pending(ci.io, errno_to_blk_status(error));
+       error = __split_and_process_bio(&ci);
+       ci.io->map_task = NULL;
+       if (error || !ci.sector_count)
+               goto out;
+
+       /*
+        * Remainder must be passed to submit_bio_noacct() so it gets handled
+        * *after* bios already submitted have been completely processed.
+        * We take a clone of the original to store in ci.io->orig_bio to be
+        * used by dm_end_io_acct() and for dm_io_complete() to use for
+        * completion handling.
+        */
+       orig_bio = bio_split(bio, bio_sectors(bio) - ci.sector_count,
+                            GFP_NOIO, &md->queue->bio_split);
+       bio_chain(orig_bio, bio);
+       trace_block_split(orig_bio, bio->bi_iter.bi_sector);
+       submit_bio_noacct(bio);
+out:
+       if (!orig_bio)
+               orig_bio = bio;
+       smp_store_release(&ci.io->orig_bio, orig_bio);
+       if (dm_io_flagged(ci.io, DM_IO_START_ACCT))
+               dm_start_io_acct(ci.io, NULL);
+
+       /*
+        * Drop the extra reference count for non-POLLED bio, and hold one
+        * reference for POLLED bio, which will be released in dm_poll_bio
+        *
+        * Add every dm_io instance into the hlist_head which is stored in
+        * bio->bi_private, so that dm_poll_bio can poll them all.
+        */
+       if (error || !ci.submit_as_polled)
+               dm_io_dec_pending(ci.io, errno_to_blk_status(error));
+       else
+               dm_queue_poll_io(bio, ci.io);
 }
 
 static void dm_submit_bio(struct bio *bio)
@@ -1495,15 +1626,10 @@ static void dm_submit_bio(struct bio *bio)
        struct dm_table *map;
 
        map = dm_get_live_table(md, &srcu_idx);
-       if (unlikely(!map)) {
-               DMERR_LIMIT("%s: mapping table unavailable, erroring io",
-                           dm_device_name(md));
-               bio_io_error(bio);
-               goto out;
-       }
 
-       /* If suspended, queue this IO for later */
-       if (unlikely(test_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags))) {
+       /* If suspended, or map not yet available, queue this IO for later */
+       if (unlikely(test_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags)) ||
+           unlikely(!map)) {
                if (bio->bi_opf & REQ_NOWAIT)
                        bio_wouldblock_error(bio);
                else if (bio->bi_opf & REQ_RAHEAD)
@@ -1520,11 +1646,72 @@ static void dm_submit_bio(struct bio *bio)
        if (is_abnormal_io(bio))
                blk_queue_split(&bio);
 
-       __split_and_process_bio(md, map, bio);
+       dm_split_and_process_bio(md, map, bio);
 out:
        dm_put_live_table(md, srcu_idx);
 }
 
+static bool dm_poll_dm_io(struct dm_io *io, struct io_comp_batch *iob,
+                         unsigned int flags)
+{
+       WARN_ON_ONCE(!dm_tio_is_normal(&io->tio));
+
+       /* don't poll if the mapped io is done */
+       if (atomic_read(&io->io_count) > 1)
+               bio_poll(&io->tio.clone, iob, flags);
+
+       /* bio_poll holds the last reference */
+       return atomic_read(&io->io_count) == 1;
+}
+
+static int dm_poll_bio(struct bio *bio, struct io_comp_batch *iob,
+                      unsigned int flags)
+{
+       struct hlist_head *head = dm_get_bio_hlist_head(bio);
+       struct hlist_head tmp = HLIST_HEAD_INIT;
+       struct hlist_node *next;
+       struct dm_io *io;
+
+       /* Only poll normal bio which was marked as REQ_DM_POLL_LIST */
+       if (!(bio->bi_opf & REQ_DM_POLL_LIST))
+               return 0;
+
+       WARN_ON_ONCE(hlist_empty(head));
+
+       hlist_move_list(head, &tmp);
+
+       /*
+        * Restore .bi_private before possibly completing dm_io.
+        *
+        * bio_poll() is only possible once @bio has been completely
+        * submitted via submit_bio_noacct()'s depth-first submission.
+        * So there is no dm_queue_poll_io() race associated with
+        * clearing REQ_DM_POLL_LIST here.
+        */
+       bio->bi_opf &= ~REQ_DM_POLL_LIST;
+       bio->bi_private = hlist_entry(tmp.first, struct dm_io, node)->data;
+
+       hlist_for_each_entry_safe(io, next, &tmp, node) {
+               if (dm_poll_dm_io(io, iob, flags)) {
+                       hlist_del_init(&io->node);
+                       /*
+                        * clone_endio() has already occurred, so passing
+                        * error as 0 here doesn't override io->status
+                        */
+                       dm_io_dec_pending(io, 0);
+               }
+       }
+
+       /* Not done? */
+       if (!hlist_empty(&tmp)) {
+               bio->bi_opf |= REQ_DM_POLL_LIST;
+               /* Reset bio->bi_private to dm_io list head */
+               hlist_move_list(&tmp, head);
+               return 0;
+       }
+       return 1;
+}
+
 /*-----------------------------------------------------------------
  * An IDR is used to keep track of allocated minor numbers.
  *---------------------------------------------------------------*/
@@ -1607,6 +1794,7 @@ static void cleanup_mapped_device(struct mapped_device *md)
                md->dax_dev = NULL;
        }
 
+       dm_cleanup_zoned_dev(md);
        if (md->disk) {
                spin_lock(&_minor_lock);
                md->disk->private_data = NULL;
@@ -1619,6 +1807,11 @@ static void cleanup_mapped_device(struct mapped_device *md)
                blk_cleanup_disk(md->disk);
        }
 
+       if (md->pending_io) {
+               free_percpu(md->pending_io);
+               md->pending_io = NULL;
+       }
+
        cleanup_srcu_struct(&md->io_barrier);
 
        mutex_destroy(&md->suspend_lock);
@@ -1627,7 +1820,6 @@ static void cleanup_mapped_device(struct mapped_device *md)
        mutex_destroy(&md->swap_bios_lock);
 
        dm_mq_cleanup_mapped_device(md);
-       dm_cleanup_zoned_dev(md);
 }
 
 /*
@@ -1721,6 +1913,10 @@ static struct mapped_device *alloc_dev(int minor)
        if (!md->wq)
                goto bad;
 
+       md->pending_io = alloc_percpu(unsigned long);
+       if (!md->pending_io)
+               goto bad;
+
        dm_stats_init(&md->stats);
 
        /* Populate the mapping, nobody knows we exist yet */
@@ -1830,8 +2026,6 @@ static struct dm_table *__bind(struct mapped_device *md, struct dm_table *t,
                               struct queue_limits *limits)
 {
        struct dm_table *old_map;
-       struct request_queue *q = md->queue;
-       bool request_based = dm_table_request_based(t);
        sector_t size;
        int ret;
 
@@ -1852,7 +2046,7 @@ static struct dm_table *__bind(struct mapped_device *md, struct dm_table *t,
 
        dm_table_event_callback(t, event_callback, md);
 
-       if (request_based) {
+       if (dm_table_request_based(t)) {
                /*
                 * Leverage the fact that request-based DM targets are
                 * immutable singletons - used to optimize dm_mq_queue_rq.
@@ -1866,7 +2060,7 @@ static struct dm_table *__bind(struct mapped_device *md, struct dm_table *t,
                goto out;
        }
 
-       ret = dm_table_set_restrictions(t, q, limits);
+       ret = dm_table_set_restrictions(t, md->queue, limits);
        if (ret) {
                old_map = ERR_PTR(ret);
                goto out;
@@ -1878,7 +2072,6 @@ static struct dm_table *__bind(struct mapped_device *md, struct dm_table *t,
 
        if (old_map)
                dm_sync_table(md);
-
 out:
        return old_map;
 }
@@ -2128,16 +2321,13 @@ void dm_put(struct mapped_device *md)
 }
 EXPORT_SYMBOL_GPL(dm_put);
 
-static bool md_in_flight_bios(struct mapped_device *md)
+static bool dm_in_flight_bios(struct mapped_device *md)
 {
        int cpu;
-       struct block_device *part = dm_disk(md)->part0;
-       long sum = 0;
+       unsigned long sum = 0;
 
-       for_each_possible_cpu(cpu) {
-               sum += part_stat_local_read_cpu(part, in_flight[0], cpu);
-               sum += part_stat_local_read_cpu(part, in_flight[1], cpu);
-       }
+       for_each_possible_cpu(cpu)
+               sum += *per_cpu_ptr(md->pending_io, cpu);
 
        return sum != 0;
 }
@@ -2150,7 +2340,7 @@ static int dm_wait_for_bios_completion(struct mapped_device *md, unsigned int ta
        while (true) {
                prepare_to_wait(&md->wait, &wait, task_state);
 
-               if (!md_in_flight_bios(md))
+               if (!dm_in_flight_bios(md))
                        break;
 
                if (signal_pending_state(task_state, current)) {
@@ -2162,6 +2352,8 @@ static int dm_wait_for_bios_completion(struct mapped_device *md, unsigned int ta
        }
        finish_wait(&md->wait, &wait);
 
+       smp_rmb();
+
        return r;
 }
 
@@ -2333,11 +2525,11 @@ static int __dm_suspend(struct mapped_device *md, struct dm_table *map,
        /*
         * Here we must make sure that no processes are submitting requests
         * to target drivers i.e. no one may be executing
-        * __split_and_process_bio from dm_submit_bio.
+        * dm_split_and_process_bio from dm_submit_bio.
         *
-        * To get all processes out of __split_and_process_bio in dm_submit_bio,
+        * To get all processes out of dm_split_and_process_bio in dm_submit_bio,
         * we take the write lock. To prevent any process from reentering
-        * __split_and_process_bio from dm_submit_bio and quiesce the thread
+        * dm_split_and_process_bio from dm_submit_bio and quiesce the thread
         * (dm_wq_work), we set DMF_BLOCK_IO_FOR_SUSPEND and call
         * flush_workqueue(md->wq).
         */
@@ -2945,6 +3137,7 @@ static const struct pr_ops dm_pr_ops = {
 
 static const struct block_device_operations dm_blk_dops = {
        .submit_bio = dm_submit_bio,
+       .poll_bio = dm_poll_bio,
        .open = dm_blk_open,
        .release = dm_blk_close,
        .ioctl = dm_blk_ioctl,