Merge tag 'block-5.11-2020-12-23' of git://git.kernel.dk/linux-block
[linux-2.6-microblaze.git] / drivers / block / rnbd / rnbd-clt.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * RDMA Network Block Driver
4  *
5  * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6  * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7  * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8  */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14 #include <linux/blkdev.h>
15 #include <linux/hdreg.h>
16 #include <linux/scatterlist.h>
17 #include <linux/idr.h>
18
19 #include "rnbd-clt.h"
20
21 MODULE_DESCRIPTION("RDMA Network Block Device Client");
22 MODULE_LICENSE("GPL");
23
24 static int rnbd_client_major;
25 static DEFINE_IDA(index_ida);
26 static DEFINE_MUTEX(ida_lock);
27 static DEFINE_MUTEX(sess_lock);
28 static LIST_HEAD(sess_list);
29
30 /*
31  * Maximum number of partitions an instance can have.
32  * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
33  */
34 #define RNBD_PART_BITS          6
35
36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess)
37 {
38         return refcount_inc_not_zero(&sess->refcount);
39 }
40
41 static void free_sess(struct rnbd_clt_session *sess);
42
43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess)
44 {
45         might_sleep();
46
47         if (refcount_dec_and_test(&sess->refcount))
48                 free_sess(sess);
49 }
50
51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev)
52 {
53         might_sleep();
54
55         if (!refcount_dec_and_test(&dev->refcount))
56                 return;
57
58         mutex_lock(&ida_lock);
59         ida_simple_remove(&index_ida, dev->clt_device_id);
60         mutex_unlock(&ida_lock);
61         kfree(dev->hw_queues);
62         kfree(dev->pathname);
63         rnbd_clt_put_sess(dev->sess);
64         mutex_destroy(&dev->lock);
65         kfree(dev);
66 }
67
68 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev)
69 {
70         return refcount_inc_not_zero(&dev->refcount);
71 }
72
73 static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev,
74                                  const struct rnbd_msg_open_rsp *rsp)
75 {
76         struct rnbd_clt_session *sess = dev->sess;
77
78         if (!rsp->logical_block_size)
79                 return -EINVAL;
80
81         dev->device_id              = le32_to_cpu(rsp->device_id);
82         dev->nsectors               = le64_to_cpu(rsp->nsectors);
83         dev->logical_block_size     = le16_to_cpu(rsp->logical_block_size);
84         dev->physical_block_size    = le16_to_cpu(rsp->physical_block_size);
85         dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors);
86         dev->max_discard_sectors    = le32_to_cpu(rsp->max_discard_sectors);
87         dev->discard_granularity    = le32_to_cpu(rsp->discard_granularity);
88         dev->discard_alignment      = le32_to_cpu(rsp->discard_alignment);
89         dev->secure_discard         = le16_to_cpu(rsp->secure_discard);
90         dev->rotational             = rsp->rotational;
91         dev->wc                     = !!(rsp->cache_policy & RNBD_WRITEBACK);
92         dev->fua                    = !!(rsp->cache_policy & RNBD_FUA);
93
94         dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE;
95         dev->max_segments = BMAX_SEGMENTS;
96
97         return 0;
98 }
99
100 static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev,
101                                     size_t new_nsectors)
102 {
103         rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n",
104                        dev->nsectors, new_nsectors);
105         dev->nsectors = new_nsectors;
106         set_capacity_and_notify(dev->gd, dev->nsectors);
107         return 0;
108 }
109
110 static int process_msg_open_rsp(struct rnbd_clt_dev *dev,
111                                 struct rnbd_msg_open_rsp *rsp)
112 {
113         int err = 0;
114
115         mutex_lock(&dev->lock);
116         if (dev->dev_state == DEV_STATE_UNMAPPED) {
117                 rnbd_clt_info(dev,
118                                "Ignoring Open-Response message from server for  unmapped device\n");
119                 err = -ENOENT;
120                 goto out;
121         }
122         if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) {
123                 u64 nsectors = le64_to_cpu(rsp->nsectors);
124
125                 /*
126                  * If the device was remapped and the size changed in the
127                  * meantime we need to revalidate it
128                  */
129                 if (dev->nsectors != nsectors)
130                         rnbd_clt_change_capacity(dev, nsectors);
131                 rnbd_clt_info(dev, "Device online, device remapped successfully\n");
132         }
133         err = rnbd_clt_set_dev_attr(dev, rsp);
134         if (err)
135                 goto out;
136         dev->dev_state = DEV_STATE_MAPPED;
137
138 out:
139         mutex_unlock(&dev->lock);
140
141         return err;
142 }
143
144 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize)
145 {
146         int ret = 0;
147
148         mutex_lock(&dev->lock);
149         if (dev->dev_state != DEV_STATE_MAPPED) {
150                 pr_err("Failed to set new size of the device, device is not opened\n");
151                 ret = -ENOENT;
152                 goto out;
153         }
154         ret = rnbd_clt_change_capacity(dev, newsize);
155
156 out:
157         mutex_unlock(&dev->lock);
158
159         return ret;
160 }
161
162 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q)
163 {
164         if (WARN_ON(!q->hctx))
165                 return;
166
167         /* We can come here from interrupt, thus async=true */
168         blk_mq_run_hw_queue(q->hctx, true);
169 }
170
171 enum {
172         RNBD_DELAY_IFBUSY = -1,
173 };
174
175 /**
176  * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
177  * @sess:       Session to find a queue for
178  * @cpu:        Cpu to start the search from
179  *
180  * Description:
181  *     Each CPU has a list of HW queues, which needs to be rerun.  If a list
182  *     is not empty - it is marked with a bit.  This function finds first
183  *     set bit in a bitmap and returns corresponding CPU list.
184  */
185 static struct rnbd_cpu_qlist *
186 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
187 {
188         int bit;
189
190         /* Search from cpu to nr_cpu_ids */
191         bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu);
192         if (bit < nr_cpu_ids) {
193                 return per_cpu_ptr(sess->cpu_queues, bit);
194         } else if (cpu != 0) {
195                 /* Search from 0 to cpu */
196                 bit = find_next_bit(sess->cpu_queues_bm, cpu, 0);
197                 if (bit < cpu)
198                         return per_cpu_ptr(sess->cpu_queues, bit);
199         }
200
201         return NULL;
202 }
203
204 static inline int nxt_cpu(int cpu)
205 {
206         return (cpu + 1) % nr_cpu_ids;
207 }
208
209 /**
210  * rnbd_rerun_if_needed() - rerun next queue marked as stopped
211  * @sess:       Session to rerun a queue on
212  *
213  * Description:
214  *     Each CPU has it's own list of HW queues, which should be rerun.
215  *     Function finds such list with HW queues, takes a list lock, picks up
216  *     the first HW queue out of the list and requeues it.
217  *
218  * Return:
219  *     True if the queue was requeued, false otherwise.
220  *
221  * Context:
222  *     Does not matter.
223  */
224 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
225 {
226         struct rnbd_queue *q = NULL;
227         struct rnbd_cpu_qlist *cpu_q;
228         unsigned long flags;
229         int *cpup;
230
231         /*
232          * To keep fairness and not to let other queues starve we always
233          * try to wake up someone else in round-robin manner.  That of course
234          * increases latency but queues always have a chance to be executed.
235          */
236         cpup = get_cpu_ptr(sess->cpu_rr);
237         for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
238              cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) {
239                 if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
240                         continue;
241                 if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm)))
242                         goto unlock;
243                 q = list_first_entry_or_null(&cpu_q->requeue_list,
244                                              typeof(*q), requeue_list);
245                 if (WARN_ON(!q))
246                         goto clear_bit;
247                 list_del_init(&q->requeue_list);
248                 clear_bit_unlock(0, &q->in_list);
249
250                 if (list_empty(&cpu_q->requeue_list)) {
251                         /* Clear bit if nothing is left */
252 clear_bit:
253                         clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
254                 }
255 unlock:
256                 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
257
258                 if (q)
259                         break;
260         }
261
262         /**
263          * Saves the CPU that is going to be requeued on the per-cpu var. Just
264          * incrementing it doesn't work because rnbd_get_cpu_qlist() will
265          * always return the first CPU with something on the queue list when the
266          * value stored on the var is greater than the last CPU with something
267          * on the list.
268          */
269         if (cpu_q)
270                 *cpup = cpu_q->cpu;
271         put_cpu_var(sess->cpu_rr);
272
273         if (q)
274                 rnbd_clt_dev_requeue(q);
275
276         return q;
277 }
278
279 /**
280  * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
281  *                               session is idling (there are no requests
282  *                               in-flight).
283  * @sess:       Session to rerun the queues on
284  *
285  * Description:
286  *     This function tries to rerun all stopped queues if there are no
287  *     requests in-flight anymore.  This function tries to solve an obvious
288  *     problem, when number of tags < than number of queues (hctx), which
289  *     are stopped and put to sleep.  If last permit, which has been just put,
290  *     does not wake up all left queues (hctxs), IO requests hang forever.
291  *
292  *     That can happen when all number of permits, say N, have been exhausted
293  *     from one CPU, and we have many block devices per session, say M.
294  *     Each block device has it's own queue (hctx) for each CPU, so eventually
295  *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
296  *     If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
297  *
298  *     To avoid this hang last caller of rnbd_put_permit() (last caller is the
299  *     one who observes sess->busy == 0) must wake up all remaining queues.
300  *
301  * Context:
302  *     Does not matter.
303  */
304 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
305 {
306         bool requeued;
307
308         do {
309                 requeued = rnbd_rerun_if_needed(sess);
310         } while (atomic_read(&sess->busy) == 0 && requeued);
311 }
312
313 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess,
314                                              enum rtrs_clt_con_type con_type,
315                                              int wait)
316 {
317         struct rtrs_permit *permit;
318
319         permit = rtrs_clt_get_permit(sess->rtrs, con_type,
320                                       wait ? RTRS_PERMIT_WAIT :
321                                       RTRS_PERMIT_NOWAIT);
322         if (likely(permit))
323                 /* We have a subtle rare case here, when all permits can be
324                  * consumed before busy counter increased.  This is safe,
325                  * because loser will get NULL as a permit, observe 0 busy
326                  * counter and immediately restart the queue himself.
327                  */
328                 atomic_inc(&sess->busy);
329
330         return permit;
331 }
332
333 static void rnbd_put_permit(struct rnbd_clt_session *sess,
334                              struct rtrs_permit *permit)
335 {
336         rtrs_clt_put_permit(sess->rtrs, permit);
337         atomic_dec(&sess->busy);
338         /* Paired with rnbd_clt_dev_add_to_requeue().  Decrement first
339          * and then check queue bits.
340          */
341         smp_mb__after_atomic();
342         rnbd_rerun_all_if_idle(sess);
343 }
344
345 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess,
346                                      enum rtrs_clt_con_type con_type,
347                                      int wait)
348 {
349         struct rnbd_iu *iu;
350         struct rtrs_permit *permit;
351
352         iu = kzalloc(sizeof(*iu), GFP_KERNEL);
353         if (!iu) {
354                 return NULL;
355         }
356
357         permit = rnbd_get_permit(sess, con_type,
358                                   wait ? RTRS_PERMIT_WAIT :
359                                   RTRS_PERMIT_NOWAIT);
360         if (unlikely(!permit)) {
361                 kfree(iu);
362                 return NULL;
363         }
364
365         iu->permit = permit;
366         /*
367          * 1st reference is dropped after finishing sending a "user" message,
368          * 2nd reference is dropped after confirmation with the response is
369          * returned.
370          * 1st and 2nd can happen in any order, so the rnbd_iu should be
371          * released (rtrs_permit returned to rtrs) only after both
372          * are finished.
373          */
374         atomic_set(&iu->refcount, 2);
375         init_waitqueue_head(&iu->comp.wait);
376         iu->comp.errno = INT_MAX;
377
378         return iu;
379 }
380
381 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu)
382 {
383         if (atomic_dec_and_test(&iu->refcount)) {
384                 rnbd_put_permit(sess, iu->permit);
385                 kfree(iu);
386         }
387 }
388
389 static void rnbd_softirq_done_fn(struct request *rq)
390 {
391         struct rnbd_clt_dev *dev        = rq->rq_disk->private_data;
392         struct rnbd_clt_session *sess   = dev->sess;
393         struct rnbd_iu *iu;
394
395         iu = blk_mq_rq_to_pdu(rq);
396         sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
397         rnbd_put_permit(sess, iu->permit);
398         blk_mq_end_request(rq, errno_to_blk_status(iu->errno));
399 }
400
401 static void msg_io_conf(void *priv, int errno)
402 {
403         struct rnbd_iu *iu = priv;
404         struct rnbd_clt_dev *dev = iu->dev;
405         struct request *rq = iu->rq;
406         int rw = rq_data_dir(rq);
407
408         iu->errno = errno;
409
410         blk_mq_complete_request(rq);
411
412         if (errno)
413                 rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n",
414                                  rw == READ ? "read" : "write", errno);
415 }
416
417 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno)
418 {
419         iu->comp.errno = errno;
420         wake_up(&iu->comp.wait);
421 }
422
423 static void msg_conf(void *priv, int errno)
424 {
425         struct rnbd_iu *iu = priv;
426
427         iu->errno = errno;
428         schedule_work(&iu->work);
429 }
430
431 enum wait_type {
432         NO_WAIT = 0,
433         WAIT    = 1
434 };
435
436 static int send_usr_msg(struct rtrs_clt *rtrs, int dir,
437                         struct rnbd_iu *iu, struct kvec *vec,
438                         size_t len, struct scatterlist *sg, unsigned int sg_len,
439                         void (*conf)(struct work_struct *work),
440                         int *errno, enum wait_type wait)
441 {
442         int err;
443         struct rtrs_clt_req_ops req_ops;
444
445         INIT_WORK(&iu->work, conf);
446         req_ops = (struct rtrs_clt_req_ops) {
447                 .priv = iu,
448                 .conf_fn = msg_conf,
449         };
450         err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit,
451                                 vec, 1, len, sg, sg_len);
452         if (!err && wait) {
453                 wait_event(iu->comp.wait, iu->comp.errno != INT_MAX);
454                 *errno = iu->comp.errno;
455         } else {
456                 *errno = 0;
457         }
458
459         return err;
460 }
461
462 static void msg_close_conf(struct work_struct *work)
463 {
464         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
465         struct rnbd_clt_dev *dev = iu->dev;
466
467         wake_up_iu_comp(iu, iu->errno);
468         rnbd_put_iu(dev->sess, iu);
469         rnbd_clt_put_dev(dev);
470 }
471
472 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait)
473 {
474         struct rnbd_clt_session *sess = dev->sess;
475         struct rnbd_msg_close msg;
476         struct rnbd_iu *iu;
477         struct kvec vec = {
478                 .iov_base = &msg,
479                 .iov_len  = sizeof(msg)
480         };
481         int err, errno;
482
483         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
484         if (!iu)
485                 return -ENOMEM;
486
487         iu->buf = NULL;
488         iu->dev = dev;
489
490         sg_alloc_table(&iu->sgt, 1, GFP_KERNEL);
491
492         msg.hdr.type    = cpu_to_le16(RNBD_MSG_CLOSE);
493         msg.device_id   = cpu_to_le32(device_id);
494
495         WARN_ON(!rnbd_clt_get_dev(dev));
496         err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0,
497                            msg_close_conf, &errno, wait);
498         if (err) {
499                 rnbd_clt_put_dev(dev);
500                 rnbd_put_iu(sess, iu);
501         } else {
502                 err = errno;
503         }
504
505         sg_free_table(&iu->sgt);
506         rnbd_put_iu(sess, iu);
507         return err;
508 }
509
510 static void msg_open_conf(struct work_struct *work)
511 {
512         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
513         struct rnbd_msg_open_rsp *rsp = iu->buf;
514         struct rnbd_clt_dev *dev = iu->dev;
515         int errno = iu->errno;
516
517         if (errno) {
518                 rnbd_clt_err(dev,
519                               "Opening failed, server responded: %d\n",
520                               errno);
521         } else {
522                 errno = process_msg_open_rsp(dev, rsp);
523                 if (errno) {
524                         u32 device_id = le32_to_cpu(rsp->device_id);
525                         /*
526                          * If server thinks its fine, but we fail to process
527                          * then be nice and send a close to server.
528                          */
529                         (void)send_msg_close(dev, device_id, NO_WAIT);
530                 }
531         }
532         kfree(rsp);
533         wake_up_iu_comp(iu, errno);
534         rnbd_put_iu(dev->sess, iu);
535         rnbd_clt_put_dev(dev);
536 }
537
538 static void msg_sess_info_conf(struct work_struct *work)
539 {
540         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
541         struct rnbd_msg_sess_info_rsp *rsp = iu->buf;
542         struct rnbd_clt_session *sess = iu->sess;
543
544         if (!iu->errno)
545                 sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR);
546
547         kfree(rsp);
548         wake_up_iu_comp(iu, iu->errno);
549         rnbd_put_iu(sess, iu);
550         rnbd_clt_put_sess(sess);
551 }
552
553 static int send_msg_open(struct rnbd_clt_dev *dev, bool wait)
554 {
555         struct rnbd_clt_session *sess = dev->sess;
556         struct rnbd_msg_open_rsp *rsp;
557         struct rnbd_msg_open msg;
558         struct rnbd_iu *iu;
559         struct kvec vec = {
560                 .iov_base = &msg,
561                 .iov_len  = sizeof(msg)
562         };
563         int err, errno;
564
565         rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
566         if (!rsp)
567                 return -ENOMEM;
568
569         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
570         if (!iu) {
571                 kfree(rsp);
572                 return -ENOMEM;
573         }
574
575         iu->buf = rsp;
576         iu->dev = dev;
577
578         sg_alloc_table(&iu->sgt, 1, GFP_KERNEL);
579         sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
580
581         msg.hdr.type    = cpu_to_le16(RNBD_MSG_OPEN);
582         msg.access_mode = dev->access_mode;
583         strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
584
585         WARN_ON(!rnbd_clt_get_dev(dev));
586         err = send_usr_msg(sess->rtrs, READ, iu,
587                            &vec, sizeof(*rsp), iu->sgt.sgl, 1,
588                            msg_open_conf, &errno, wait);
589         if (err) {
590                 rnbd_clt_put_dev(dev);
591                 rnbd_put_iu(sess, iu);
592                 kfree(rsp);
593         } else {
594                 err = errno;
595         }
596
597         sg_free_table(&iu->sgt);
598         rnbd_put_iu(sess, iu);
599         return err;
600 }
601
602 static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait)
603 {
604         struct rnbd_msg_sess_info_rsp *rsp;
605         struct rnbd_msg_sess_info msg;
606         struct rnbd_iu *iu;
607         struct kvec vec = {
608                 .iov_base = &msg,
609                 .iov_len  = sizeof(msg)
610         };
611         int err, errno;
612
613         rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
614         if (!rsp)
615                 return -ENOMEM;
616
617         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
618         if (!iu) {
619                 kfree(rsp);
620                 return -ENOMEM;
621         }
622
623         iu->buf = rsp;
624         iu->sess = sess;
625
626         sg_alloc_table(&iu->sgt, 1, GFP_KERNEL);
627         sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
628
629         msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO);
630         msg.ver      = RNBD_PROTO_VER_MAJOR;
631
632         if (!rnbd_clt_get_sess(sess)) {
633                 /*
634                  * That can happen only in one case, when RTRS has restablished
635                  * the connection and link_ev() is called, but session is almost
636                  * dead, last reference on session is put and caller is waiting
637                  * for RTRS to close everything.
638                  */
639                 err = -ENODEV;
640                 goto put_iu;
641         }
642         err = send_usr_msg(sess->rtrs, READ, iu,
643                            &vec, sizeof(*rsp), iu->sgt.sgl, 1,
644                            msg_sess_info_conf, &errno, wait);
645         if (err) {
646                 rnbd_clt_put_sess(sess);
647 put_iu:
648                 rnbd_put_iu(sess, iu);
649                 kfree(rsp);
650         } else {
651                 err = errno;
652         }
653         sg_free_table(&iu->sgt);
654         rnbd_put_iu(sess, iu);
655         return err;
656 }
657
658 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess)
659 {
660         struct rnbd_clt_dev *dev;
661
662         mutex_lock(&sess->lock);
663         list_for_each_entry(dev, &sess->devs_list, list) {
664                 rnbd_clt_err(dev, "Device disconnected.\n");
665
666                 mutex_lock(&dev->lock);
667                 if (dev->dev_state == DEV_STATE_MAPPED)
668                         dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED;
669                 mutex_unlock(&dev->lock);
670         }
671         mutex_unlock(&sess->lock);
672 }
673
674 static void remap_devs(struct rnbd_clt_session *sess)
675 {
676         struct rnbd_clt_dev *dev;
677         struct rtrs_attrs attrs;
678         int err;
679
680         /*
681          * Careful here: we are called from RTRS link event directly,
682          * thus we can't send any RTRS request and wait for response
683          * or RTRS will not be able to complete request with failure
684          * if something goes wrong (failing of outstanding requests
685          * happens exactly from the context where we are blocking now).
686          *
687          * So to avoid deadlocks each usr message sent from here must
688          * be asynchronous.
689          */
690
691         err = send_msg_sess_info(sess, NO_WAIT);
692         if (err) {
693                 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err);
694                 return;
695         }
696
697         rtrs_clt_query(sess->rtrs, &attrs);
698         mutex_lock(&sess->lock);
699         sess->max_io_size = attrs.max_io_size;
700
701         list_for_each_entry(dev, &sess->devs_list, list) {
702                 bool skip;
703
704                 mutex_lock(&dev->lock);
705                 skip = (dev->dev_state == DEV_STATE_INIT);
706                 mutex_unlock(&dev->lock);
707                 if (skip)
708                         /*
709                          * When device is establishing connection for the first
710                          * time - do not remap, it will be closed soon.
711                          */
712                         continue;
713
714                 rnbd_clt_info(dev, "session reconnected, remapping device\n");
715                 err = send_msg_open(dev, NO_WAIT);
716                 if (err) {
717                         rnbd_clt_err(dev, "send_msg_open(): %d\n", err);
718                         break;
719                 }
720         }
721         mutex_unlock(&sess->lock);
722 }
723
724 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
725 {
726         struct rnbd_clt_session *sess = priv;
727
728         switch (ev) {
729         case RTRS_CLT_LINK_EV_DISCONNECTED:
730                 set_dev_states_to_disconnected(sess);
731                 break;
732         case RTRS_CLT_LINK_EV_RECONNECTED:
733                 remap_devs(sess);
734                 break;
735         default:
736                 pr_err("Unknown session event received (%d), session: %s\n",
737                        ev, sess->sessname);
738         }
739 }
740
741 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues)
742 {
743         unsigned int cpu;
744         struct rnbd_cpu_qlist *cpu_q;
745
746         for_each_possible_cpu(cpu) {
747                 cpu_q = per_cpu_ptr(cpu_queues, cpu);
748
749                 cpu_q->cpu = cpu;
750                 INIT_LIST_HEAD(&cpu_q->requeue_list);
751                 spin_lock_init(&cpu_q->requeue_lock);
752         }
753 }
754
755 static void destroy_mq_tags(struct rnbd_clt_session *sess)
756 {
757         if (sess->tag_set.tags)
758                 blk_mq_free_tag_set(&sess->tag_set);
759 }
760
761 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess)
762 {
763         sess->rtrs_ready = true;
764         wake_up_all(&sess->rtrs_waitq);
765 }
766
767 static void close_rtrs(struct rnbd_clt_session *sess)
768 {
769         might_sleep();
770
771         if (!IS_ERR_OR_NULL(sess->rtrs)) {
772                 rtrs_clt_close(sess->rtrs);
773                 sess->rtrs = NULL;
774                 wake_up_rtrs_waiters(sess);
775         }
776 }
777
778 static void free_sess(struct rnbd_clt_session *sess)
779 {
780         WARN_ON(!list_empty(&sess->devs_list));
781
782         might_sleep();
783
784         close_rtrs(sess);
785         destroy_mq_tags(sess);
786         if (!list_empty(&sess->list)) {
787                 mutex_lock(&sess_lock);
788                 list_del(&sess->list);
789                 mutex_unlock(&sess_lock);
790         }
791         free_percpu(sess->cpu_queues);
792         free_percpu(sess->cpu_rr);
793         mutex_destroy(&sess->lock);
794         kfree(sess);
795 }
796
797 static struct rnbd_clt_session *alloc_sess(const char *sessname)
798 {
799         struct rnbd_clt_session *sess;
800         int err, cpu;
801
802         sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
803         if (!sess)
804                 return ERR_PTR(-ENOMEM);
805         strlcpy(sess->sessname, sessname, sizeof(sess->sessname));
806         atomic_set(&sess->busy, 0);
807         mutex_init(&sess->lock);
808         INIT_LIST_HEAD(&sess->devs_list);
809         INIT_LIST_HEAD(&sess->list);
810         bitmap_zero(sess->cpu_queues_bm, NR_CPUS);
811         init_waitqueue_head(&sess->rtrs_waitq);
812         refcount_set(&sess->refcount, 1);
813
814         sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist);
815         if (!sess->cpu_queues) {
816                 err = -ENOMEM;
817                 goto err;
818         }
819         rnbd_init_cpu_qlists(sess->cpu_queues);
820
821         /*
822          * That is simple percpu variable which stores cpu indices, which are
823          * incremented on each access.  We need that for the sake of fairness
824          * to wake up queues in a round-robin manner.
825          */
826         sess->cpu_rr = alloc_percpu(int);
827         if (!sess->cpu_rr) {
828                 err = -ENOMEM;
829                 goto err;
830         }
831         for_each_possible_cpu(cpu)
832                 * per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
833
834         return sess;
835
836 err:
837         free_sess(sess);
838
839         return ERR_PTR(err);
840 }
841
842 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess)
843 {
844         wait_event(sess->rtrs_waitq, sess->rtrs_ready);
845         if (IS_ERR_OR_NULL(sess->rtrs))
846                 return -ECONNRESET;
847
848         return 0;
849 }
850
851 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess)
852         __releases(&sess_lock)
853         __acquires(&sess_lock)
854 {
855         DEFINE_WAIT(wait);
856
857         prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE);
858         if (IS_ERR_OR_NULL(sess->rtrs)) {
859                 finish_wait(&sess->rtrs_waitq, &wait);
860                 return;
861         }
862         mutex_unlock(&sess_lock);
863         /* loop in caller, see __find_and_get_sess().
864          * You can't leave mutex locked and call schedule(), you will catch a
865          * deadlock with a caller of free_sess(), which has just put the last
866          * reference and is about to take the sess_lock in order to delete
867          * the session from the list.
868          */
869         schedule();
870         mutex_lock(&sess_lock);
871 }
872
873 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname)
874         __releases(&sess_lock)
875         __acquires(&sess_lock)
876 {
877         struct rnbd_clt_session *sess, *sn;
878         int err;
879
880 again:
881         list_for_each_entry_safe(sess, sn, &sess_list, list) {
882                 if (strcmp(sessname, sess->sessname))
883                         continue;
884
885                 if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs))
886                         /*
887                          * No RTRS connection, session is dying.
888                          */
889                         continue;
890
891                 if (rnbd_clt_get_sess(sess)) {
892                         /*
893                          * Alive session is found, wait for RTRS connection.
894                          */
895                         mutex_unlock(&sess_lock);
896                         err = wait_for_rtrs_connection(sess);
897                         if (err)
898                                 rnbd_clt_put_sess(sess);
899                         mutex_lock(&sess_lock);
900
901                         if (err)
902                                 /* Session is dying, repeat the loop */
903                                 goto again;
904
905                         return sess;
906                 }
907                 /*
908                  * Ref is 0, session is dying, wait for RTRS disconnect
909                  * in order to avoid session names clashes.
910                  */
911                 wait_for_rtrs_disconnection(sess);
912                 /*
913                  * RTRS is disconnected and soon session will be freed,
914                  * so repeat a loop.
915                  */
916                 goto again;
917         }
918
919         return NULL;
920 }
921
922 static struct
923 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first)
924 {
925         struct rnbd_clt_session *sess = NULL;
926
927         mutex_lock(&sess_lock);
928         sess = __find_and_get_sess(sessname);
929         if (!sess) {
930                 sess = alloc_sess(sessname);
931                 if (IS_ERR(sess)) {
932                         mutex_unlock(&sess_lock);
933                         return sess;
934                 }
935                 list_add(&sess->list, &sess_list);
936                 *first = true;
937         } else
938                 *first = false;
939         mutex_unlock(&sess_lock);
940
941         return sess;
942 }
943
944 static int rnbd_client_open(struct block_device *block_device, fmode_t mode)
945 {
946         struct rnbd_clt_dev *dev = block_device->bd_disk->private_data;
947
948         if (dev->read_only && (mode & FMODE_WRITE))
949                 return -EPERM;
950
951         if (dev->dev_state == DEV_STATE_UNMAPPED ||
952             !rnbd_clt_get_dev(dev))
953                 return -EIO;
954
955         return 0;
956 }
957
958 static void rnbd_client_release(struct gendisk *gen, fmode_t mode)
959 {
960         struct rnbd_clt_dev *dev = gen->private_data;
961
962         rnbd_clt_put_dev(dev);
963 }
964
965 static int rnbd_client_getgeo(struct block_device *block_device,
966                               struct hd_geometry *geo)
967 {
968         u64 size;
969         struct rnbd_clt_dev *dev;
970
971         dev = block_device->bd_disk->private_data;
972         size = dev->size * (dev->logical_block_size / SECTOR_SIZE);
973         geo->cylinders  = size >> 6;    /* size/64 */
974         geo->heads      = 4;
975         geo->sectors    = 16;
976         geo->start      = 0;
977
978         return 0;
979 }
980
981 static const struct block_device_operations rnbd_client_ops = {
982         .owner          = THIS_MODULE,
983         .open           = rnbd_client_open,
984         .release        = rnbd_client_release,
985         .getgeo         = rnbd_client_getgeo
986 };
987
988 /* The amount of data that belongs to an I/O and the amount of data that
989  * should be read or written to the disk (bi_size) can differ.
990  *
991  * E.g. When WRITE_SAME is used, only a small amount of data is
992  * transferred that is then written repeatedly over a lot of sectors.
993  *
994  * Get the size of data to be transferred via RTRS by summing up the size
995  * of the scather-gather list entries.
996  */
997 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
998 {
999         struct scatterlist *sg;
1000         size_t tsize = 0;
1001         int i;
1002
1003         for_each_sg(sglist, sg, len, i)
1004                 tsize += sg->length;
1005         return tsize;
1006 }
1007
1008 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev,
1009                                      struct request *rq,
1010                                      struct rnbd_iu *iu)
1011 {
1012         struct rtrs_clt *rtrs = dev->sess->rtrs;
1013         struct rtrs_permit *permit = iu->permit;
1014         struct rnbd_msg_io msg;
1015         struct rtrs_clt_req_ops req_ops;
1016         unsigned int sg_cnt = 0;
1017         struct kvec vec;
1018         size_t size;
1019         int err;
1020
1021         iu->rq          = rq;
1022         iu->dev         = dev;
1023         msg.sector      = cpu_to_le64(blk_rq_pos(rq));
1024         msg.bi_size     = cpu_to_le32(blk_rq_bytes(rq));
1025         msg.rw          = cpu_to_le32(rq_to_rnbd_flags(rq));
1026         msg.prio        = cpu_to_le16(req_get_ioprio(rq));
1027
1028         /*
1029          * We only support discards with single segment for now.
1030          * See queue limits.
1031          */
1032         if (req_op(rq) != REQ_OP_DISCARD)
1033                 sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sgt.sgl);
1034
1035         if (sg_cnt == 0)
1036                 sg_mark_end(&iu->sgt.sgl[0]);
1037
1038         msg.hdr.type    = cpu_to_le16(RNBD_MSG_IO);
1039         msg.device_id   = cpu_to_le32(dev->device_id);
1040
1041         vec = (struct kvec) {
1042                 .iov_base = &msg,
1043                 .iov_len  = sizeof(msg)
1044         };
1045         size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt);
1046         req_ops = (struct rtrs_clt_req_ops) {
1047                 .priv = iu,
1048                 .conf_fn = msg_io_conf,
1049         };
1050         err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit,
1051                                &vec, 1, size, iu->sgt.sgl, sg_cnt);
1052         if (unlikely(err)) {
1053                 rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n",
1054                                  err);
1055                 return err;
1056         }
1057
1058         return 0;
1059 }
1060
1061 /**
1062  * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
1063  * @dev:        Device to be checked
1064  * @q:          Queue to be added to the requeue list if required
1065  *
1066  * Description:
1067  *     If session is busy, that means someone will requeue us when resources
1068  *     are freed.  If session is not doing anything - device is not added to
1069  *     the list and @false is returned.
1070  */
1071 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev,
1072                                                 struct rnbd_queue *q)
1073 {
1074         struct rnbd_clt_session *sess = dev->sess;
1075         struct rnbd_cpu_qlist *cpu_q;
1076         unsigned long flags;
1077         bool added = true;
1078         bool need_set;
1079
1080         cpu_q = get_cpu_ptr(sess->cpu_queues);
1081         spin_lock_irqsave(&cpu_q->requeue_lock, flags);
1082
1083         if (likely(!test_and_set_bit_lock(0, &q->in_list))) {
1084                 if (WARN_ON(!list_empty(&q->requeue_list)))
1085                         goto unlock;
1086
1087                 need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm);
1088                 if (need_set) {
1089                         set_bit(cpu_q->cpu, sess->cpu_queues_bm);
1090                         /* Paired with rnbd_put_permit(). Set a bit first
1091                          * and then observe the busy counter.
1092                          */
1093                         smp_mb__before_atomic();
1094                 }
1095                 if (likely(atomic_read(&sess->busy))) {
1096                         list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
1097                 } else {
1098                         /* Very unlikely, but possible: busy counter was
1099                          * observed as zero.  Drop all bits and return
1100                          * false to restart the queue by ourselves.
1101                          */
1102                         if (need_set)
1103                                 clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
1104                         clear_bit_unlock(0, &q->in_list);
1105                         added = false;
1106                 }
1107         }
1108 unlock:
1109         spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
1110         put_cpu_ptr(sess->cpu_queues);
1111
1112         return added;
1113 }
1114
1115 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev,
1116                                         struct blk_mq_hw_ctx *hctx,
1117                                         int delay)
1118 {
1119         struct rnbd_queue *q = hctx->driver_data;
1120
1121         if (delay != RNBD_DELAY_IFBUSY)
1122                 blk_mq_delay_run_hw_queue(hctx, delay);
1123         else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q)))
1124                 /*
1125                  * If session is not busy we have to restart
1126                  * the queue ourselves.
1127                  */
1128                 blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
1129 }
1130
1131 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx,
1132                                    const struct blk_mq_queue_data *bd)
1133 {
1134         struct request *rq = bd->rq;
1135         struct rnbd_clt_dev *dev = rq->rq_disk->private_data;
1136         struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1137         int err;
1138         blk_status_t ret = BLK_STS_IOERR;
1139
1140         if (unlikely(dev->dev_state != DEV_STATE_MAPPED))
1141                 return BLK_STS_IOERR;
1142
1143         iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON,
1144                                       RTRS_PERMIT_NOWAIT);
1145         if (unlikely(!iu->permit)) {
1146                 rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY);
1147                 return BLK_STS_RESOURCE;
1148         }
1149
1150         iu->sgt.sgl = iu->first_sgl;
1151         err = sg_alloc_table_chained(&iu->sgt,
1152                                      /* Even-if the request has no segment,
1153                                       * sglist must have one entry at least */
1154                                      blk_rq_nr_phys_segments(rq) ? : 1,
1155                                      iu->sgt.sgl,
1156                                      RNBD_INLINE_SG_CNT);
1157         if (err) {
1158                 rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err);
1159                 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1160                 rnbd_put_permit(dev->sess, iu->permit);
1161                 return BLK_STS_RESOURCE;
1162         }
1163
1164         blk_mq_start_request(rq);
1165         err = rnbd_client_xfer_request(dev, rq, iu);
1166         if (likely(err == 0))
1167                 return BLK_STS_OK;
1168         if (unlikely(err == -EAGAIN || err == -ENOMEM)) {
1169                 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1170                 ret = BLK_STS_RESOURCE;
1171         }
1172         sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
1173         rnbd_put_permit(dev->sess, iu->permit);
1174         return ret;
1175 }
1176
1177 static struct blk_mq_ops rnbd_mq_ops = {
1178         .queue_rq       = rnbd_queue_rq,
1179         .complete       = rnbd_softirq_done_fn,
1180 };
1181
1182 static int setup_mq_tags(struct rnbd_clt_session *sess)
1183 {
1184         struct blk_mq_tag_set *tag_set = &sess->tag_set;
1185
1186         memset(tag_set, 0, sizeof(*tag_set));
1187         tag_set->ops            = &rnbd_mq_ops;
1188         tag_set->queue_depth    = sess->queue_depth;
1189         tag_set->numa_node              = NUMA_NO_NODE;
1190         tag_set->flags          = BLK_MQ_F_SHOULD_MERGE |
1191                                   BLK_MQ_F_TAG_QUEUE_SHARED;
1192         tag_set->cmd_size       = sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE;
1193         tag_set->nr_hw_queues   = num_online_cpus();
1194
1195         return blk_mq_alloc_tag_set(tag_set);
1196 }
1197
1198 static struct rnbd_clt_session *
1199 find_and_get_or_create_sess(const char *sessname,
1200                             const struct rtrs_addr *paths,
1201                             size_t path_cnt, u16 port_nr)
1202 {
1203         struct rnbd_clt_session *sess;
1204         struct rtrs_attrs attrs;
1205         int err;
1206         bool first;
1207         struct rtrs_clt_ops rtrs_ops;
1208
1209         sess = find_or_create_sess(sessname, &first);
1210         if (sess == ERR_PTR(-ENOMEM))
1211                 return ERR_PTR(-ENOMEM);
1212         else if (!first)
1213                 return sess;
1214
1215         if (!path_cnt) {
1216                 pr_err("Session %s not found, and path parameter not given", sessname);
1217                 err = -ENXIO;
1218                 goto put_sess;
1219         }
1220
1221         rtrs_ops = (struct rtrs_clt_ops) {
1222                 .priv = sess,
1223                 .link_ev = rnbd_clt_link_ev,
1224         };
1225         /*
1226          * Nothing was found, establish rtrs connection and proceed further.
1227          */
1228         sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
1229                                    paths, path_cnt, port_nr,
1230                                    0, /* Do not use pdu of rtrs */
1231                                    RECONNECT_DELAY, BMAX_SEGMENTS,
1232                                    BLK_MAX_SEGMENT_SIZE,
1233                                    MAX_RECONNECTS);
1234         if (IS_ERR(sess->rtrs)) {
1235                 err = PTR_ERR(sess->rtrs);
1236                 goto wake_up_and_put;
1237         }
1238         rtrs_clt_query(sess->rtrs, &attrs);
1239         sess->max_io_size = attrs.max_io_size;
1240         sess->queue_depth = attrs.queue_depth;
1241
1242         err = setup_mq_tags(sess);
1243         if (err)
1244                 goto close_rtrs;
1245
1246         err = send_msg_sess_info(sess, WAIT);
1247         if (err)
1248                 goto close_rtrs;
1249
1250         wake_up_rtrs_waiters(sess);
1251
1252         return sess;
1253
1254 close_rtrs:
1255         close_rtrs(sess);
1256 put_sess:
1257         rnbd_clt_put_sess(sess);
1258
1259         return ERR_PTR(err);
1260
1261 wake_up_and_put:
1262         wake_up_rtrs_waiters(sess);
1263         goto put_sess;
1264 }
1265
1266 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev,
1267                                        struct rnbd_queue *q,
1268                                        struct blk_mq_hw_ctx *hctx)
1269 {
1270         INIT_LIST_HEAD(&q->requeue_list);
1271         q->dev  = dev;
1272         q->hctx = hctx;
1273 }
1274
1275 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev)
1276 {
1277         int i;
1278         struct blk_mq_hw_ctx *hctx;
1279         struct rnbd_queue *q;
1280
1281         queue_for_each_hw_ctx(dev->queue, hctx, i) {
1282                 q = &dev->hw_queues[i];
1283                 rnbd_init_hw_queue(dev, q, hctx);
1284                 hctx->driver_data = q;
1285         }
1286 }
1287
1288 static int setup_mq_dev(struct rnbd_clt_dev *dev)
1289 {
1290         dev->queue = blk_mq_init_queue(&dev->sess->tag_set);
1291         if (IS_ERR(dev->queue)) {
1292                 rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n",
1293                               PTR_ERR(dev->queue));
1294                 return PTR_ERR(dev->queue);
1295         }
1296         rnbd_init_mq_hw_queues(dev);
1297         return 0;
1298 }
1299
1300 static void setup_request_queue(struct rnbd_clt_dev *dev)
1301 {
1302         blk_queue_logical_block_size(dev->queue, dev->logical_block_size);
1303         blk_queue_physical_block_size(dev->queue, dev->physical_block_size);
1304         blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors);
1305         blk_queue_max_write_same_sectors(dev->queue,
1306                                          dev->max_write_same_sectors);
1307
1308         /*
1309          * we don't support discards to "discontiguous" segments
1310          * in on request
1311          */
1312         blk_queue_max_discard_segments(dev->queue, 1);
1313
1314         blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors);
1315         dev->queue->limits.discard_granularity  = dev->discard_granularity;
1316         dev->queue->limits.discard_alignment    = dev->discard_alignment;
1317         if (dev->max_discard_sectors)
1318                 blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue);
1319         if (dev->secure_discard)
1320                 blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue);
1321
1322         blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
1323         blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
1324         blk_queue_max_segments(dev->queue, dev->max_segments);
1325         blk_queue_io_opt(dev->queue, dev->sess->max_io_size);
1326         blk_queue_virt_boundary(dev->queue, SZ_4K - 1);
1327         blk_queue_write_cache(dev->queue, dev->wc, dev->fua);
1328         dev->queue->queuedata = dev;
1329 }
1330
1331 static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx)
1332 {
1333         dev->gd->major          = rnbd_client_major;
1334         dev->gd->first_minor    = idx << RNBD_PART_BITS;
1335         dev->gd->fops           = &rnbd_client_ops;
1336         dev->gd->queue          = dev->queue;
1337         dev->gd->private_data   = dev;
1338         snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d",
1339                  idx);
1340         pr_debug("disk_name=%s, capacity=%zu\n",
1341                  dev->gd->disk_name,
1342                  dev->nsectors * (dev->logical_block_size / SECTOR_SIZE)
1343                  );
1344
1345         set_capacity(dev->gd, dev->nsectors);
1346
1347         if (dev->access_mode == RNBD_ACCESS_RO) {
1348                 dev->read_only = true;
1349                 set_disk_ro(dev->gd, true);
1350         } else {
1351                 dev->read_only = false;
1352         }
1353
1354         if (!dev->rotational)
1355                 blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue);
1356 }
1357
1358 static int rnbd_client_setup_device(struct rnbd_clt_session *sess,
1359                                      struct rnbd_clt_dev *dev, int idx)
1360 {
1361         int err;
1362
1363         dev->size = dev->nsectors * dev->logical_block_size;
1364
1365         err = setup_mq_dev(dev);
1366         if (err)
1367                 return err;
1368
1369         setup_request_queue(dev);
1370
1371         dev->gd = alloc_disk_node(1 << RNBD_PART_BITS,  NUMA_NO_NODE);
1372         if (!dev->gd) {
1373                 blk_cleanup_queue(dev->queue);
1374                 return -ENOMEM;
1375         }
1376
1377         rnbd_clt_setup_gen_disk(dev, idx);
1378
1379         return 0;
1380 }
1381
1382 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess,
1383                                       enum rnbd_access_mode access_mode,
1384                                       const char *pathname)
1385 {
1386         struct rnbd_clt_dev *dev;
1387         int ret;
1388
1389         dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE);
1390         if (!dev)
1391                 return ERR_PTR(-ENOMEM);
1392
1393         dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues),
1394                                  GFP_KERNEL);
1395         if (!dev->hw_queues) {
1396                 ret = -ENOMEM;
1397                 goto out_alloc;
1398         }
1399
1400         mutex_lock(&ida_lock);
1401         ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS),
1402                              GFP_KERNEL);
1403         mutex_unlock(&ida_lock);
1404         if (ret < 0) {
1405                 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
1406                        pathname, sess->sessname, ret);
1407                 goto out_queues;
1408         }
1409
1410         dev->pathname = kstrdup(pathname, GFP_KERNEL);
1411         if (!dev->pathname) {
1412                 ret = -ENOMEM;
1413                 goto out_queues;
1414         }
1415
1416         dev->clt_device_id      = ret;
1417         dev->sess               = sess;
1418         dev->access_mode        = access_mode;
1419         mutex_init(&dev->lock);
1420         refcount_set(&dev->refcount, 1);
1421         dev->dev_state = DEV_STATE_INIT;
1422
1423         /*
1424          * Here we called from sysfs entry, thus clt-sysfs is
1425          * responsible that session will not disappear.
1426          */
1427         WARN_ON(!rnbd_clt_get_sess(sess));
1428
1429         return dev;
1430
1431 out_queues:
1432         kfree(dev->hw_queues);
1433 out_alloc:
1434         kfree(dev);
1435         return ERR_PTR(ret);
1436 }
1437
1438 static bool __exists_dev(const char *pathname, const char *sessname)
1439 {
1440         struct rnbd_clt_session *sess;
1441         struct rnbd_clt_dev *dev;
1442         bool found = false;
1443
1444         list_for_each_entry(sess, &sess_list, list) {
1445                 if (sessname && strncmp(sess->sessname, sessname,
1446                                         sizeof(sess->sessname)))
1447                         continue;
1448                 mutex_lock(&sess->lock);
1449                 list_for_each_entry(dev, &sess->devs_list, list) {
1450                         if (strlen(dev->pathname) == strlen(pathname) &&
1451                             !strcmp(dev->pathname, pathname)) {
1452                                 found = true;
1453                                 break;
1454                         }
1455                 }
1456                 mutex_unlock(&sess->lock);
1457                 if (found)
1458                         break;
1459         }
1460
1461         return found;
1462 }
1463
1464 static bool exists_devpath(const char *pathname, const char *sessname)
1465 {
1466         bool found;
1467
1468         mutex_lock(&sess_lock);
1469         found = __exists_dev(pathname, sessname);
1470         mutex_unlock(&sess_lock);
1471
1472         return found;
1473 }
1474
1475 static bool insert_dev_if_not_exists_devpath(const char *pathname,
1476                                              struct rnbd_clt_session *sess,
1477                                              struct rnbd_clt_dev *dev)
1478 {
1479         bool found;
1480
1481         mutex_lock(&sess_lock);
1482         found = __exists_dev(pathname, sess->sessname);
1483         if (!found) {
1484                 mutex_lock(&sess->lock);
1485                 list_add_tail(&dev->list, &sess->devs_list);
1486                 mutex_unlock(&sess->lock);
1487         }
1488         mutex_unlock(&sess_lock);
1489
1490         return found;
1491 }
1492
1493 static void delete_dev(struct rnbd_clt_dev *dev)
1494 {
1495         struct rnbd_clt_session *sess = dev->sess;
1496
1497         mutex_lock(&sess->lock);
1498         list_del(&dev->list);
1499         mutex_unlock(&sess->lock);
1500 }
1501
1502 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname,
1503                                            struct rtrs_addr *paths,
1504                                            size_t path_cnt, u16 port_nr,
1505                                            const char *pathname,
1506                                            enum rnbd_access_mode access_mode)
1507 {
1508         struct rnbd_clt_session *sess;
1509         struct rnbd_clt_dev *dev;
1510         int ret;
1511
1512         if (unlikely(exists_devpath(pathname, sessname)))
1513                 return ERR_PTR(-EEXIST);
1514
1515         sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr);
1516         if (IS_ERR(sess))
1517                 return ERR_CAST(sess);
1518
1519         dev = init_dev(sess, access_mode, pathname);
1520         if (IS_ERR(dev)) {
1521                 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n",
1522                        pathname, sess->sessname, PTR_ERR(dev));
1523                 ret = PTR_ERR(dev);
1524                 goto put_sess;
1525         }
1526         if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) {
1527                 ret = -EEXIST;
1528                 goto put_dev;
1529         }
1530         ret = send_msg_open(dev, WAIT);
1531         if (ret) {
1532                 rnbd_clt_err(dev,
1533                               "map_device: failed, can't open remote device, err: %d\n",
1534                               ret);
1535                 goto del_dev;
1536         }
1537         mutex_lock(&dev->lock);
1538         pr_debug("Opened remote device: session=%s, path='%s'\n",
1539                  sess->sessname, pathname);
1540         ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id);
1541         if (ret) {
1542                 rnbd_clt_err(dev,
1543                               "map_device: Failed to configure device, err: %d\n",
1544                               ret);
1545                 mutex_unlock(&dev->lock);
1546                 goto send_close;
1547         }
1548
1549         rnbd_clt_info(dev,
1550                        "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d, wc: %d, fua: %d)\n",
1551                        dev->gd->disk_name, dev->nsectors,
1552                        dev->logical_block_size, dev->physical_block_size,
1553                        dev->max_write_same_sectors, dev->max_discard_sectors,
1554                        dev->discard_granularity, dev->discard_alignment,
1555                        dev->secure_discard, dev->max_segments,
1556                        dev->max_hw_sectors, dev->rotational, dev->wc, dev->fua);
1557
1558         mutex_unlock(&dev->lock);
1559
1560         add_disk(dev->gd);
1561         rnbd_clt_put_sess(sess);
1562
1563         return dev;
1564
1565 send_close:
1566         send_msg_close(dev, dev->device_id, WAIT);
1567 del_dev:
1568         delete_dev(dev);
1569 put_dev:
1570         rnbd_clt_put_dev(dev);
1571 put_sess:
1572         rnbd_clt_put_sess(sess);
1573
1574         return ERR_PTR(ret);
1575 }
1576
1577 static void destroy_gen_disk(struct rnbd_clt_dev *dev)
1578 {
1579         del_gendisk(dev->gd);
1580         blk_cleanup_queue(dev->queue);
1581         put_disk(dev->gd);
1582 }
1583
1584 static void destroy_sysfs(struct rnbd_clt_dev *dev,
1585                           const struct attribute *sysfs_self)
1586 {
1587         rnbd_clt_remove_dev_symlink(dev);
1588         if (dev->kobj.state_initialized) {
1589                 if (sysfs_self)
1590                         /* To avoid deadlock firstly remove itself */
1591                         sysfs_remove_file_self(&dev->kobj, sysfs_self);
1592                 kobject_del(&dev->kobj);
1593                 kobject_put(&dev->kobj);
1594         }
1595 }
1596
1597 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force,
1598                            const struct attribute *sysfs_self)
1599 {
1600         struct rnbd_clt_session *sess = dev->sess;
1601         int refcount, ret = 0;
1602         bool was_mapped;
1603
1604         mutex_lock(&dev->lock);
1605         if (dev->dev_state == DEV_STATE_UNMAPPED) {
1606                 rnbd_clt_info(dev, "Device is already being unmapped\n");
1607                 ret = -EALREADY;
1608                 goto err;
1609         }
1610         refcount = refcount_read(&dev->refcount);
1611         if (!force && refcount > 1) {
1612                 rnbd_clt_err(dev,
1613                               "Closing device failed, device is in use, (%d device users)\n",
1614                               refcount - 1);
1615                 ret = -EBUSY;
1616                 goto err;
1617         }
1618         was_mapped = (dev->dev_state == DEV_STATE_MAPPED);
1619         dev->dev_state = DEV_STATE_UNMAPPED;
1620         mutex_unlock(&dev->lock);
1621
1622         delete_dev(dev);
1623         destroy_sysfs(dev, sysfs_self);
1624         destroy_gen_disk(dev);
1625         if (was_mapped && sess->rtrs)
1626                 send_msg_close(dev, dev->device_id, WAIT);
1627
1628         rnbd_clt_info(dev, "Device is unmapped\n");
1629
1630         /* Likely last reference put */
1631         rnbd_clt_put_dev(dev);
1632
1633         /*
1634          * Here device and session can be vanished!
1635          */
1636
1637         return 0;
1638 err:
1639         mutex_unlock(&dev->lock);
1640
1641         return ret;
1642 }
1643
1644 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev)
1645 {
1646         int err;
1647
1648         mutex_lock(&dev->lock);
1649         if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED)
1650                 err = 0;
1651         else if (dev->dev_state == DEV_STATE_UNMAPPED)
1652                 err = -ENODEV;
1653         else if (dev->dev_state == DEV_STATE_MAPPED)
1654                 err = -EALREADY;
1655         else
1656                 err = -EBUSY;
1657         mutex_unlock(&dev->lock);
1658         if (!err) {
1659                 rnbd_clt_info(dev, "Remapping device.\n");
1660                 err = send_msg_open(dev, WAIT);
1661                 if (err)
1662                         rnbd_clt_err(dev, "remap_device: %d\n", err);
1663         }
1664
1665         return err;
1666 }
1667
1668 static void unmap_device_work(struct work_struct *work)
1669 {
1670         struct rnbd_clt_dev *dev;
1671
1672         dev = container_of(work, typeof(*dev), unmap_on_rmmod_work);
1673         rnbd_clt_unmap_device(dev, true, NULL);
1674 }
1675
1676 static void rnbd_destroy_sessions(void)
1677 {
1678         struct rnbd_clt_session *sess, *sn;
1679         struct rnbd_clt_dev *dev, *tn;
1680
1681         /* Firstly forbid access through sysfs interface */
1682         rnbd_clt_destroy_default_group();
1683         rnbd_clt_destroy_sysfs_files();
1684
1685         /*
1686          * Here at this point there is no any concurrent access to sessions
1687          * list and devices list:
1688          *   1. New session or device can't be created - session sysfs files
1689          *      are removed.
1690          *   2. Device or session can't be removed - module reference is taken
1691          *      into account in unmap device sysfs callback.
1692          *   3. No IO requests inflight - each file open of block_dev increases
1693          *      module reference in get_disk().
1694          *
1695          * But still there can be user requests inflights, which are sent by
1696          * asynchronous send_msg_*() functions, thus before unmapping devices
1697          * RTRS session must be explicitly closed.
1698          */
1699
1700         list_for_each_entry_safe(sess, sn, &sess_list, list) {
1701                 WARN_ON(!rnbd_clt_get_sess(sess));
1702                 close_rtrs(sess);
1703                 list_for_each_entry_safe(dev, tn, &sess->devs_list, list) {
1704                         /*
1705                          * Here unmap happens in parallel for only one reason:
1706                          * blk_cleanup_queue() takes around half a second, so
1707                          * on huge amount of devices the whole module unload
1708                          * procedure takes minutes.
1709                          */
1710                         INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
1711                         queue_work(system_long_wq, &dev->unmap_on_rmmod_work);
1712                 }
1713                 rnbd_clt_put_sess(sess);
1714         }
1715         /* Wait for all scheduled unmap works */
1716         flush_workqueue(system_long_wq);
1717         WARN_ON(!list_empty(&sess_list));
1718 }
1719
1720 static int __init rnbd_client_init(void)
1721 {
1722         int err = 0;
1723
1724         BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4);
1725         BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36);
1726         BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36);
1727         BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264);
1728         BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8);
1729         BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56);
1730         rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd");
1731         if (rnbd_client_major <= 0) {
1732                 pr_err("Failed to load module, block device registration failed\n");
1733                 return -EBUSY;
1734         }
1735
1736         err = rnbd_clt_create_sysfs_files();
1737         if (err) {
1738                 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
1739                        err);
1740                 unregister_blkdev(rnbd_client_major, "rnbd");
1741         }
1742
1743         return err;
1744 }
1745
1746 static void __exit rnbd_client_exit(void)
1747 {
1748         rnbd_destroy_sessions();
1749         unregister_blkdev(rnbd_client_major, "rnbd");
1750         ida_destroy(&index_ida);
1751 }
1752
1753 module_init(rnbd_client_init);
1754 module_exit(rnbd_client_exit);