Merge tag 'riscv-for-linus-5.11-mw0' of git://git.kernel.org/pub/scm/linux/kernel...
[linux-2.6-microblaze.git] / drivers / block / rnbd / rnbd-clt.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * RDMA Network Block Driver
4  *
5  * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6  * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7  * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8  */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14 #include <linux/blkdev.h>
15 #include <linux/hdreg.h>
16 #include <linux/scatterlist.h>
17 #include <linux/idr.h>
18
19 #include "rnbd-clt.h"
20
21 MODULE_DESCRIPTION("RDMA Network Block Device Client");
22 MODULE_LICENSE("GPL");
23
24 static int rnbd_client_major;
25 static DEFINE_IDA(index_ida);
26 static DEFINE_MUTEX(ida_lock);
27 static DEFINE_MUTEX(sess_lock);
28 static LIST_HEAD(sess_list);
29
30 /*
31  * Maximum number of partitions an instance can have.
32  * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
33  */
34 #define RNBD_PART_BITS          6
35
36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess)
37 {
38         return refcount_inc_not_zero(&sess->refcount);
39 }
40
41 static void free_sess(struct rnbd_clt_session *sess);
42
43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess)
44 {
45         might_sleep();
46
47         if (refcount_dec_and_test(&sess->refcount))
48                 free_sess(sess);
49 }
50
51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev)
52 {
53         might_sleep();
54
55         if (!refcount_dec_and_test(&dev->refcount))
56                 return;
57
58         mutex_lock(&ida_lock);
59         ida_simple_remove(&index_ida, dev->clt_device_id);
60         mutex_unlock(&ida_lock);
61         kfree(dev->hw_queues);
62         kfree(dev->pathname);
63         rnbd_clt_put_sess(dev->sess);
64         mutex_destroy(&dev->lock);
65         kfree(dev);
66 }
67
68 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev)
69 {
70         return refcount_inc_not_zero(&dev->refcount);
71 }
72
73 static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev,
74                                  const struct rnbd_msg_open_rsp *rsp)
75 {
76         struct rnbd_clt_session *sess = dev->sess;
77
78         if (!rsp->logical_block_size)
79                 return -EINVAL;
80
81         dev->device_id              = le32_to_cpu(rsp->device_id);
82         dev->nsectors               = le64_to_cpu(rsp->nsectors);
83         dev->logical_block_size     = le16_to_cpu(rsp->logical_block_size);
84         dev->physical_block_size    = le16_to_cpu(rsp->physical_block_size);
85         dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors);
86         dev->max_discard_sectors    = le32_to_cpu(rsp->max_discard_sectors);
87         dev->discard_granularity    = le32_to_cpu(rsp->discard_granularity);
88         dev->discard_alignment      = le32_to_cpu(rsp->discard_alignment);
89         dev->secure_discard         = le16_to_cpu(rsp->secure_discard);
90         dev->rotational             = rsp->rotational;
91
92         dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE;
93         dev->max_segments = BMAX_SEGMENTS;
94
95         return 0;
96 }
97
98 static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev,
99                                     size_t new_nsectors)
100 {
101         rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n",
102                        dev->nsectors, new_nsectors);
103         dev->nsectors = new_nsectors;
104         set_capacity_and_notify(dev->gd, dev->nsectors);
105         return 0;
106 }
107
108 static int process_msg_open_rsp(struct rnbd_clt_dev *dev,
109                                 struct rnbd_msg_open_rsp *rsp)
110 {
111         int err = 0;
112
113         mutex_lock(&dev->lock);
114         if (dev->dev_state == DEV_STATE_UNMAPPED) {
115                 rnbd_clt_info(dev,
116                                "Ignoring Open-Response message from server for  unmapped device\n");
117                 err = -ENOENT;
118                 goto out;
119         }
120         if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) {
121                 u64 nsectors = le64_to_cpu(rsp->nsectors);
122
123                 /*
124                  * If the device was remapped and the size changed in the
125                  * meantime we need to revalidate it
126                  */
127                 if (dev->nsectors != nsectors)
128                         rnbd_clt_change_capacity(dev, nsectors);
129                 rnbd_clt_info(dev, "Device online, device remapped successfully\n");
130         }
131         err = rnbd_clt_set_dev_attr(dev, rsp);
132         if (err)
133                 goto out;
134         dev->dev_state = DEV_STATE_MAPPED;
135
136 out:
137         mutex_unlock(&dev->lock);
138
139         return err;
140 }
141
142 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize)
143 {
144         int ret = 0;
145
146         mutex_lock(&dev->lock);
147         if (dev->dev_state != DEV_STATE_MAPPED) {
148                 pr_err("Failed to set new size of the device, device is not opened\n");
149                 ret = -ENOENT;
150                 goto out;
151         }
152         ret = rnbd_clt_change_capacity(dev, newsize);
153
154 out:
155         mutex_unlock(&dev->lock);
156
157         return ret;
158 }
159
160 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q)
161 {
162         if (WARN_ON(!q->hctx))
163                 return;
164
165         /* We can come here from interrupt, thus async=true */
166         blk_mq_run_hw_queue(q->hctx, true);
167 }
168
169 enum {
170         RNBD_DELAY_IFBUSY = -1,
171 };
172
173 /**
174  * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
175  * @sess:       Session to find a queue for
176  * @cpu:        Cpu to start the search from
177  *
178  * Description:
179  *     Each CPU has a list of HW queues, which needs to be rerun.  If a list
180  *     is not empty - it is marked with a bit.  This function finds first
181  *     set bit in a bitmap and returns corresponding CPU list.
182  */
183 static struct rnbd_cpu_qlist *
184 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
185 {
186         int bit;
187
188         /* Search from cpu to nr_cpu_ids */
189         bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu);
190         if (bit < nr_cpu_ids) {
191                 return per_cpu_ptr(sess->cpu_queues, bit);
192         } else if (cpu != 0) {
193                 /* Search from 0 to cpu */
194                 bit = find_next_bit(sess->cpu_queues_bm, cpu, 0);
195                 if (bit < cpu)
196                         return per_cpu_ptr(sess->cpu_queues, bit);
197         }
198
199         return NULL;
200 }
201
202 static inline int nxt_cpu(int cpu)
203 {
204         return (cpu + 1) % nr_cpu_ids;
205 }
206
207 /**
208  * rnbd_rerun_if_needed() - rerun next queue marked as stopped
209  * @sess:       Session to rerun a queue on
210  *
211  * Description:
212  *     Each CPU has it's own list of HW queues, which should be rerun.
213  *     Function finds such list with HW queues, takes a list lock, picks up
214  *     the first HW queue out of the list and requeues it.
215  *
216  * Return:
217  *     True if the queue was requeued, false otherwise.
218  *
219  * Context:
220  *     Does not matter.
221  */
222 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
223 {
224         struct rnbd_queue *q = NULL;
225         struct rnbd_cpu_qlist *cpu_q;
226         unsigned long flags;
227         int *cpup;
228
229         /*
230          * To keep fairness and not to let other queues starve we always
231          * try to wake up someone else in round-robin manner.  That of course
232          * increases latency but queues always have a chance to be executed.
233          */
234         cpup = get_cpu_ptr(sess->cpu_rr);
235         for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
236              cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) {
237                 if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
238                         continue;
239                 if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm)))
240                         goto unlock;
241                 q = list_first_entry_or_null(&cpu_q->requeue_list,
242                                              typeof(*q), requeue_list);
243                 if (WARN_ON(!q))
244                         goto clear_bit;
245                 list_del_init(&q->requeue_list);
246                 clear_bit_unlock(0, &q->in_list);
247
248                 if (list_empty(&cpu_q->requeue_list)) {
249                         /* Clear bit if nothing is left */
250 clear_bit:
251                         clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
252                 }
253 unlock:
254                 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
255
256                 if (q)
257                         break;
258         }
259
260         /**
261          * Saves the CPU that is going to be requeued on the per-cpu var. Just
262          * incrementing it doesn't work because rnbd_get_cpu_qlist() will
263          * always return the first CPU with something on the queue list when the
264          * value stored on the var is greater than the last CPU with something
265          * on the list.
266          */
267         if (cpu_q)
268                 *cpup = cpu_q->cpu;
269         put_cpu_var(sess->cpu_rr);
270
271         if (q)
272                 rnbd_clt_dev_requeue(q);
273
274         return q;
275 }
276
277 /**
278  * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
279  *                               session is idling (there are no requests
280  *                               in-flight).
281  * @sess:       Session to rerun the queues on
282  *
283  * Description:
284  *     This function tries to rerun all stopped queues if there are no
285  *     requests in-flight anymore.  This function tries to solve an obvious
286  *     problem, when number of tags < than number of queues (hctx), which
287  *     are stopped and put to sleep.  If last permit, which has been just put,
288  *     does not wake up all left queues (hctxs), IO requests hang forever.
289  *
290  *     That can happen when all number of permits, say N, have been exhausted
291  *     from one CPU, and we have many block devices per session, say M.
292  *     Each block device has it's own queue (hctx) for each CPU, so eventually
293  *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
294  *     If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
295  *
296  *     To avoid this hang last caller of rnbd_put_permit() (last caller is the
297  *     one who observes sess->busy == 0) must wake up all remaining queues.
298  *
299  * Context:
300  *     Does not matter.
301  */
302 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
303 {
304         bool requeued;
305
306         do {
307                 requeued = rnbd_rerun_if_needed(sess);
308         } while (atomic_read(&sess->busy) == 0 && requeued);
309 }
310
311 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess,
312                                              enum rtrs_clt_con_type con_type,
313                                              int wait)
314 {
315         struct rtrs_permit *permit;
316
317         permit = rtrs_clt_get_permit(sess->rtrs, con_type,
318                                       wait ? RTRS_PERMIT_WAIT :
319                                       RTRS_PERMIT_NOWAIT);
320         if (likely(permit))
321                 /* We have a subtle rare case here, when all permits can be
322                  * consumed before busy counter increased.  This is safe,
323                  * because loser will get NULL as a permit, observe 0 busy
324                  * counter and immediately restart the queue himself.
325                  */
326                 atomic_inc(&sess->busy);
327
328         return permit;
329 }
330
331 static void rnbd_put_permit(struct rnbd_clt_session *sess,
332                              struct rtrs_permit *permit)
333 {
334         rtrs_clt_put_permit(sess->rtrs, permit);
335         atomic_dec(&sess->busy);
336         /* Paired with rnbd_clt_dev_add_to_requeue().  Decrement first
337          * and then check queue bits.
338          */
339         smp_mb__after_atomic();
340         rnbd_rerun_all_if_idle(sess);
341 }
342
343 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess,
344                                      enum rtrs_clt_con_type con_type,
345                                      int wait)
346 {
347         struct rnbd_iu *iu;
348         struct rtrs_permit *permit;
349
350         permit = rnbd_get_permit(sess, con_type,
351                                   wait ? RTRS_PERMIT_WAIT :
352                                   RTRS_PERMIT_NOWAIT);
353         if (unlikely(!permit))
354                 return NULL;
355         iu = rtrs_permit_to_pdu(permit);
356         iu->permit = permit;
357         /*
358          * 1st reference is dropped after finishing sending a "user" message,
359          * 2nd reference is dropped after confirmation with the response is
360          * returned.
361          * 1st and 2nd can happen in any order, so the rnbd_iu should be
362          * released (rtrs_permit returned to ibbtrs) only leased after both
363          * are finished.
364          */
365         atomic_set(&iu->refcount, 2);
366         init_waitqueue_head(&iu->comp.wait);
367         iu->comp.errno = INT_MAX;
368
369         return iu;
370 }
371
372 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu)
373 {
374         if (atomic_dec_and_test(&iu->refcount))
375                 rnbd_put_permit(sess, iu->permit);
376 }
377
378 static void rnbd_softirq_done_fn(struct request *rq)
379 {
380         struct rnbd_clt_dev *dev        = rq->rq_disk->private_data;
381         struct rnbd_clt_session *sess   = dev->sess;
382         struct rnbd_iu *iu;
383
384         iu = blk_mq_rq_to_pdu(rq);
385         rnbd_put_permit(sess, iu->permit);
386         blk_mq_end_request(rq, errno_to_blk_status(iu->errno));
387 }
388
389 static void msg_io_conf(void *priv, int errno)
390 {
391         struct rnbd_iu *iu = priv;
392         struct rnbd_clt_dev *dev = iu->dev;
393         struct request *rq = iu->rq;
394         int rw = rq_data_dir(rq);
395
396         iu->errno = errno;
397
398         blk_mq_complete_request(rq);
399
400         if (errno)
401                 rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n",
402                                  rw == READ ? "read" : "write", errno);
403 }
404
405 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno)
406 {
407         iu->comp.errno = errno;
408         wake_up(&iu->comp.wait);
409 }
410
411 static void msg_conf(void *priv, int errno)
412 {
413         struct rnbd_iu *iu = priv;
414
415         iu->errno = errno;
416         schedule_work(&iu->work);
417 }
418
419 enum wait_type {
420         NO_WAIT = 0,
421         WAIT    = 1
422 };
423
424 static int send_usr_msg(struct rtrs_clt *rtrs, int dir,
425                         struct rnbd_iu *iu, struct kvec *vec,
426                         size_t len, struct scatterlist *sg, unsigned int sg_len,
427                         void (*conf)(struct work_struct *work),
428                         int *errno, enum wait_type wait)
429 {
430         int err;
431         struct rtrs_clt_req_ops req_ops;
432
433         INIT_WORK(&iu->work, conf);
434         req_ops = (struct rtrs_clt_req_ops) {
435                 .priv = iu,
436                 .conf_fn = msg_conf,
437         };
438         err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit,
439                                 vec, 1, len, sg, sg_len);
440         if (!err && wait) {
441                 wait_event(iu->comp.wait, iu->comp.errno != INT_MAX);
442                 *errno = iu->comp.errno;
443         } else {
444                 *errno = 0;
445         }
446
447         return err;
448 }
449
450 static void msg_close_conf(struct work_struct *work)
451 {
452         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
453         struct rnbd_clt_dev *dev = iu->dev;
454
455         wake_up_iu_comp(iu, iu->errno);
456         rnbd_put_iu(dev->sess, iu);
457         rnbd_clt_put_dev(dev);
458 }
459
460 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait)
461 {
462         struct rnbd_clt_session *sess = dev->sess;
463         struct rnbd_msg_close msg;
464         struct rnbd_iu *iu;
465         struct kvec vec = {
466                 .iov_base = &msg,
467                 .iov_len  = sizeof(msg)
468         };
469         int err, errno;
470
471         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
472         if (!iu)
473                 return -ENOMEM;
474
475         iu->buf = NULL;
476         iu->dev = dev;
477
478         sg_mark_end(&iu->sglist[0]);
479
480         msg.hdr.type    = cpu_to_le16(RNBD_MSG_CLOSE);
481         msg.device_id   = cpu_to_le32(device_id);
482
483         WARN_ON(!rnbd_clt_get_dev(dev));
484         err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0,
485                            msg_close_conf, &errno, wait);
486         if (err) {
487                 rnbd_clt_put_dev(dev);
488                 rnbd_put_iu(sess, iu);
489         } else {
490                 err = errno;
491         }
492
493         rnbd_put_iu(sess, iu);
494         return err;
495 }
496
497 static void msg_open_conf(struct work_struct *work)
498 {
499         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
500         struct rnbd_msg_open_rsp *rsp = iu->buf;
501         struct rnbd_clt_dev *dev = iu->dev;
502         int errno = iu->errno;
503
504         if (errno) {
505                 rnbd_clt_err(dev,
506                               "Opening failed, server responded: %d\n",
507                               errno);
508         } else {
509                 errno = process_msg_open_rsp(dev, rsp);
510                 if (errno) {
511                         u32 device_id = le32_to_cpu(rsp->device_id);
512                         /*
513                          * If server thinks its fine, but we fail to process
514                          * then be nice and send a close to server.
515                          */
516                         (void)send_msg_close(dev, device_id, NO_WAIT);
517                 }
518         }
519         kfree(rsp);
520         wake_up_iu_comp(iu, errno);
521         rnbd_put_iu(dev->sess, iu);
522         rnbd_clt_put_dev(dev);
523 }
524
525 static void msg_sess_info_conf(struct work_struct *work)
526 {
527         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
528         struct rnbd_msg_sess_info_rsp *rsp = iu->buf;
529         struct rnbd_clt_session *sess = iu->sess;
530
531         if (!iu->errno)
532                 sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR);
533
534         kfree(rsp);
535         wake_up_iu_comp(iu, iu->errno);
536         rnbd_put_iu(sess, iu);
537         rnbd_clt_put_sess(sess);
538 }
539
540 static int send_msg_open(struct rnbd_clt_dev *dev, bool wait)
541 {
542         struct rnbd_clt_session *sess = dev->sess;
543         struct rnbd_msg_open_rsp *rsp;
544         struct rnbd_msg_open msg;
545         struct rnbd_iu *iu;
546         struct kvec vec = {
547                 .iov_base = &msg,
548                 .iov_len  = sizeof(msg)
549         };
550         int err, errno;
551
552         rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
553         if (!rsp)
554                 return -ENOMEM;
555
556         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
557         if (!iu) {
558                 kfree(rsp);
559                 return -ENOMEM;
560         }
561
562         iu->buf = rsp;
563         iu->dev = dev;
564
565         sg_init_one(iu->sglist, rsp, sizeof(*rsp));
566
567         msg.hdr.type    = cpu_to_le16(RNBD_MSG_OPEN);
568         msg.access_mode = dev->access_mode;
569         strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
570
571         WARN_ON(!rnbd_clt_get_dev(dev));
572         err = send_usr_msg(sess->rtrs, READ, iu,
573                            &vec, sizeof(*rsp), iu->sglist, 1,
574                            msg_open_conf, &errno, wait);
575         if (err) {
576                 rnbd_clt_put_dev(dev);
577                 rnbd_put_iu(sess, iu);
578                 kfree(rsp);
579         } else {
580                 err = errno;
581         }
582
583         rnbd_put_iu(sess, iu);
584         return err;
585 }
586
587 static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait)
588 {
589         struct rnbd_msg_sess_info_rsp *rsp;
590         struct rnbd_msg_sess_info msg;
591         struct rnbd_iu *iu;
592         struct kvec vec = {
593                 .iov_base = &msg,
594                 .iov_len  = sizeof(msg)
595         };
596         int err, errno;
597
598         rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
599         if (!rsp)
600                 return -ENOMEM;
601
602         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
603         if (!iu) {
604                 kfree(rsp);
605                 return -ENOMEM;
606         }
607
608         iu->buf = rsp;
609         iu->sess = sess;
610
611         sg_init_one(iu->sglist, rsp, sizeof(*rsp));
612
613         msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO);
614         msg.ver      = RNBD_PROTO_VER_MAJOR;
615
616         if (!rnbd_clt_get_sess(sess)) {
617                 /*
618                  * That can happen only in one case, when RTRS has restablished
619                  * the connection and link_ev() is called, but session is almost
620                  * dead, last reference on session is put and caller is waiting
621                  * for RTRS to close everything.
622                  */
623                 err = -ENODEV;
624                 goto put_iu;
625         }
626         err = send_usr_msg(sess->rtrs, READ, iu,
627                            &vec, sizeof(*rsp), iu->sglist, 1,
628                            msg_sess_info_conf, &errno, wait);
629         if (err) {
630                 rnbd_clt_put_sess(sess);
631 put_iu:
632                 rnbd_put_iu(sess, iu);
633                 kfree(rsp);
634         } else {
635                 err = errno;
636         }
637
638         rnbd_put_iu(sess, iu);
639         return err;
640 }
641
642 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess)
643 {
644         struct rnbd_clt_dev *dev;
645
646         mutex_lock(&sess->lock);
647         list_for_each_entry(dev, &sess->devs_list, list) {
648                 rnbd_clt_err(dev, "Device disconnected.\n");
649
650                 mutex_lock(&dev->lock);
651                 if (dev->dev_state == DEV_STATE_MAPPED)
652                         dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED;
653                 mutex_unlock(&dev->lock);
654         }
655         mutex_unlock(&sess->lock);
656 }
657
658 static void remap_devs(struct rnbd_clt_session *sess)
659 {
660         struct rnbd_clt_dev *dev;
661         struct rtrs_attrs attrs;
662         int err;
663
664         /*
665          * Careful here: we are called from RTRS link event directly,
666          * thus we can't send any RTRS request and wait for response
667          * or RTRS will not be able to complete request with failure
668          * if something goes wrong (failing of outstanding requests
669          * happens exactly from the context where we are blocking now).
670          *
671          * So to avoid deadlocks each usr message sent from here must
672          * be asynchronous.
673          */
674
675         err = send_msg_sess_info(sess, NO_WAIT);
676         if (err) {
677                 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err);
678                 return;
679         }
680
681         rtrs_clt_query(sess->rtrs, &attrs);
682         mutex_lock(&sess->lock);
683         sess->max_io_size = attrs.max_io_size;
684
685         list_for_each_entry(dev, &sess->devs_list, list) {
686                 bool skip;
687
688                 mutex_lock(&dev->lock);
689                 skip = (dev->dev_state == DEV_STATE_INIT);
690                 mutex_unlock(&dev->lock);
691                 if (skip)
692                         /*
693                          * When device is establishing connection for the first
694                          * time - do not remap, it will be closed soon.
695                          */
696                         continue;
697
698                 rnbd_clt_info(dev, "session reconnected, remapping device\n");
699                 err = send_msg_open(dev, NO_WAIT);
700                 if (err) {
701                         rnbd_clt_err(dev, "send_msg_open(): %d\n", err);
702                         break;
703                 }
704         }
705         mutex_unlock(&sess->lock);
706 }
707
708 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
709 {
710         struct rnbd_clt_session *sess = priv;
711
712         switch (ev) {
713         case RTRS_CLT_LINK_EV_DISCONNECTED:
714                 set_dev_states_to_disconnected(sess);
715                 break;
716         case RTRS_CLT_LINK_EV_RECONNECTED:
717                 remap_devs(sess);
718                 break;
719         default:
720                 pr_err("Unknown session event received (%d), session: %s\n",
721                        ev, sess->sessname);
722         }
723 }
724
725 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues)
726 {
727         unsigned int cpu;
728         struct rnbd_cpu_qlist *cpu_q;
729
730         for_each_possible_cpu(cpu) {
731                 cpu_q = per_cpu_ptr(cpu_queues, cpu);
732
733                 cpu_q->cpu = cpu;
734                 INIT_LIST_HEAD(&cpu_q->requeue_list);
735                 spin_lock_init(&cpu_q->requeue_lock);
736         }
737 }
738
739 static void destroy_mq_tags(struct rnbd_clt_session *sess)
740 {
741         if (sess->tag_set.tags)
742                 blk_mq_free_tag_set(&sess->tag_set);
743 }
744
745 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess)
746 {
747         sess->rtrs_ready = true;
748         wake_up_all(&sess->rtrs_waitq);
749 }
750
751 static void close_rtrs(struct rnbd_clt_session *sess)
752 {
753         might_sleep();
754
755         if (!IS_ERR_OR_NULL(sess->rtrs)) {
756                 rtrs_clt_close(sess->rtrs);
757                 sess->rtrs = NULL;
758                 wake_up_rtrs_waiters(sess);
759         }
760 }
761
762 static void free_sess(struct rnbd_clt_session *sess)
763 {
764         WARN_ON(!list_empty(&sess->devs_list));
765
766         might_sleep();
767
768         close_rtrs(sess);
769         destroy_mq_tags(sess);
770         if (!list_empty(&sess->list)) {
771                 mutex_lock(&sess_lock);
772                 list_del(&sess->list);
773                 mutex_unlock(&sess_lock);
774         }
775         free_percpu(sess->cpu_queues);
776         free_percpu(sess->cpu_rr);
777         mutex_destroy(&sess->lock);
778         kfree(sess);
779 }
780
781 static struct rnbd_clt_session *alloc_sess(const char *sessname)
782 {
783         struct rnbd_clt_session *sess;
784         int err, cpu;
785
786         sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
787         if (!sess)
788                 return ERR_PTR(-ENOMEM);
789         strlcpy(sess->sessname, sessname, sizeof(sess->sessname));
790         atomic_set(&sess->busy, 0);
791         mutex_init(&sess->lock);
792         INIT_LIST_HEAD(&sess->devs_list);
793         INIT_LIST_HEAD(&sess->list);
794         bitmap_zero(sess->cpu_queues_bm, NR_CPUS);
795         init_waitqueue_head(&sess->rtrs_waitq);
796         refcount_set(&sess->refcount, 1);
797
798         sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist);
799         if (!sess->cpu_queues) {
800                 err = -ENOMEM;
801                 goto err;
802         }
803         rnbd_init_cpu_qlists(sess->cpu_queues);
804
805         /*
806          * That is simple percpu variable which stores cpu indeces, which are
807          * incremented on each access.  We need that for the sake of fairness
808          * to wake up queues in a round-robin manner.
809          */
810         sess->cpu_rr = alloc_percpu(int);
811         if (!sess->cpu_rr) {
812                 err = -ENOMEM;
813                 goto err;
814         }
815         for_each_possible_cpu(cpu)
816                 * per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
817
818         return sess;
819
820 err:
821         free_sess(sess);
822
823         return ERR_PTR(err);
824 }
825
826 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess)
827 {
828         wait_event(sess->rtrs_waitq, sess->rtrs_ready);
829         if (IS_ERR_OR_NULL(sess->rtrs))
830                 return -ECONNRESET;
831
832         return 0;
833 }
834
835 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess)
836         __releases(&sess_lock)
837         __acquires(&sess_lock)
838 {
839         DEFINE_WAIT(wait);
840
841         prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE);
842         if (IS_ERR_OR_NULL(sess->rtrs)) {
843                 finish_wait(&sess->rtrs_waitq, &wait);
844                 return;
845         }
846         mutex_unlock(&sess_lock);
847         /* loop in caller, see __find_and_get_sess().
848          * You can't leave mutex locked and call schedule(), you will catch a
849          * deadlock with a caller of free_sess(), which has just put the last
850          * reference and is about to take the sess_lock in order to delete
851          * the session from the list.
852          */
853         schedule();
854         mutex_lock(&sess_lock);
855 }
856
857 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname)
858         __releases(&sess_lock)
859         __acquires(&sess_lock)
860 {
861         struct rnbd_clt_session *sess, *sn;
862         int err;
863
864 again:
865         list_for_each_entry_safe(sess, sn, &sess_list, list) {
866                 if (strcmp(sessname, sess->sessname))
867                         continue;
868
869                 if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs))
870                         /*
871                          * No RTRS connection, session is dying.
872                          */
873                         continue;
874
875                 if (rnbd_clt_get_sess(sess)) {
876                         /*
877                          * Alive session is found, wait for RTRS connection.
878                          */
879                         mutex_unlock(&sess_lock);
880                         err = wait_for_rtrs_connection(sess);
881                         if (err)
882                                 rnbd_clt_put_sess(sess);
883                         mutex_lock(&sess_lock);
884
885                         if (err)
886                                 /* Session is dying, repeat the loop */
887                                 goto again;
888
889                         return sess;
890                 }
891                 /*
892                  * Ref is 0, session is dying, wait for RTRS disconnect
893                  * in order to avoid session names clashes.
894                  */
895                 wait_for_rtrs_disconnection(sess);
896                 /*
897                  * RTRS is disconnected and soon session will be freed,
898                  * so repeat a loop.
899                  */
900                 goto again;
901         }
902
903         return NULL;
904 }
905
906 static struct
907 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first)
908 {
909         struct rnbd_clt_session *sess = NULL;
910
911         mutex_lock(&sess_lock);
912         sess = __find_and_get_sess(sessname);
913         if (!sess) {
914                 sess = alloc_sess(sessname);
915                 if (IS_ERR(sess)) {
916                         mutex_unlock(&sess_lock);
917                         return sess;
918                 }
919                 list_add(&sess->list, &sess_list);
920                 *first = true;
921         } else
922                 *first = false;
923         mutex_unlock(&sess_lock);
924
925         return sess;
926 }
927
928 static int rnbd_client_open(struct block_device *block_device, fmode_t mode)
929 {
930         struct rnbd_clt_dev *dev = block_device->bd_disk->private_data;
931
932         if (dev->read_only && (mode & FMODE_WRITE))
933                 return -EPERM;
934
935         if (dev->dev_state == DEV_STATE_UNMAPPED ||
936             !rnbd_clt_get_dev(dev))
937                 return -EIO;
938
939         return 0;
940 }
941
942 static void rnbd_client_release(struct gendisk *gen, fmode_t mode)
943 {
944         struct rnbd_clt_dev *dev = gen->private_data;
945
946         rnbd_clt_put_dev(dev);
947 }
948
949 static int rnbd_client_getgeo(struct block_device *block_device,
950                               struct hd_geometry *geo)
951 {
952         u64 size;
953         struct rnbd_clt_dev *dev;
954
955         dev = block_device->bd_disk->private_data;
956         size = dev->size * (dev->logical_block_size / SECTOR_SIZE);
957         geo->cylinders  = size >> 6;    /* size/64 */
958         geo->heads      = 4;
959         geo->sectors    = 16;
960         geo->start      = 0;
961
962         return 0;
963 }
964
965 static const struct block_device_operations rnbd_client_ops = {
966         .owner          = THIS_MODULE,
967         .open           = rnbd_client_open,
968         .release        = rnbd_client_release,
969         .getgeo         = rnbd_client_getgeo
970 };
971
972 /* The amount of data that belongs to an I/O and the amount of data that
973  * should be read or written to the disk (bi_size) can differ.
974  *
975  * E.g. When WRITE_SAME is used, only a small amount of data is
976  * transferred that is then written repeatedly over a lot of sectors.
977  *
978  * Get the size of data to be transferred via RTRS by summing up the size
979  * of the scather-gather list entries.
980  */
981 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
982 {
983         struct scatterlist *sg;
984         size_t tsize = 0;
985         int i;
986
987         for_each_sg(sglist, sg, len, i)
988                 tsize += sg->length;
989         return tsize;
990 }
991
992 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev,
993                                      struct request *rq,
994                                      struct rnbd_iu *iu)
995 {
996         struct rtrs_clt *rtrs = dev->sess->rtrs;
997         struct rtrs_permit *permit = iu->permit;
998         struct rnbd_msg_io msg;
999         struct rtrs_clt_req_ops req_ops;
1000         unsigned int sg_cnt = 0;
1001         struct kvec vec;
1002         size_t size;
1003         int err;
1004
1005         iu->rq          = rq;
1006         iu->dev         = dev;
1007         msg.sector      = cpu_to_le64(blk_rq_pos(rq));
1008         msg.bi_size     = cpu_to_le32(blk_rq_bytes(rq));
1009         msg.rw          = cpu_to_le32(rq_to_rnbd_flags(rq));
1010         msg.prio        = cpu_to_le16(req_get_ioprio(rq));
1011
1012         /*
1013          * We only support discards with single segment for now.
1014          * See queue limits.
1015          */
1016         if (req_op(rq) != REQ_OP_DISCARD)
1017                 sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sglist);
1018
1019         if (sg_cnt == 0)
1020                 /* Do not forget to mark the end */
1021                 sg_mark_end(&iu->sglist[0]);
1022
1023         msg.hdr.type    = cpu_to_le16(RNBD_MSG_IO);
1024         msg.device_id   = cpu_to_le32(dev->device_id);
1025
1026         vec = (struct kvec) {
1027                 .iov_base = &msg,
1028                 .iov_len  = sizeof(msg)
1029         };
1030         size = rnbd_clt_get_sg_size(iu->sglist, sg_cnt);
1031         req_ops = (struct rtrs_clt_req_ops) {
1032                 .priv = iu,
1033                 .conf_fn = msg_io_conf,
1034         };
1035         err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit,
1036                                &vec, 1, size, iu->sglist, sg_cnt);
1037         if (unlikely(err)) {
1038                 rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n",
1039                                  err);
1040                 return err;
1041         }
1042
1043         return 0;
1044 }
1045
1046 /**
1047  * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
1048  * @dev:        Device to be checked
1049  * @q:          Queue to be added to the requeue list if required
1050  *
1051  * Description:
1052  *     If session is busy, that means someone will requeue us when resources
1053  *     are freed.  If session is not doing anything - device is not added to
1054  *     the list and @false is returned.
1055  */
1056 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev,
1057                                                 struct rnbd_queue *q)
1058 {
1059         struct rnbd_clt_session *sess = dev->sess;
1060         struct rnbd_cpu_qlist *cpu_q;
1061         unsigned long flags;
1062         bool added = true;
1063         bool need_set;
1064
1065         cpu_q = get_cpu_ptr(sess->cpu_queues);
1066         spin_lock_irqsave(&cpu_q->requeue_lock, flags);
1067
1068         if (likely(!test_and_set_bit_lock(0, &q->in_list))) {
1069                 if (WARN_ON(!list_empty(&q->requeue_list)))
1070                         goto unlock;
1071
1072                 need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm);
1073                 if (need_set) {
1074                         set_bit(cpu_q->cpu, sess->cpu_queues_bm);
1075                         /* Paired with rnbd_put_permit(). Set a bit first
1076                          * and then observe the busy counter.
1077                          */
1078                         smp_mb__before_atomic();
1079                 }
1080                 if (likely(atomic_read(&sess->busy))) {
1081                         list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
1082                 } else {
1083                         /* Very unlikely, but possible: busy counter was
1084                          * observed as zero.  Drop all bits and return
1085                          * false to restart the queue by ourselves.
1086                          */
1087                         if (need_set)
1088                                 clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
1089                         clear_bit_unlock(0, &q->in_list);
1090                         added = false;
1091                 }
1092         }
1093 unlock:
1094         spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
1095         put_cpu_ptr(sess->cpu_queues);
1096
1097         return added;
1098 }
1099
1100 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev,
1101                                         struct blk_mq_hw_ctx *hctx,
1102                                         int delay)
1103 {
1104         struct rnbd_queue *q = hctx->driver_data;
1105
1106         if (delay != RNBD_DELAY_IFBUSY)
1107                 blk_mq_delay_run_hw_queue(hctx, delay);
1108         else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q)))
1109                 /*
1110                  * If session is not busy we have to restart
1111                  * the queue ourselves.
1112                  */
1113                 blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
1114 }
1115
1116 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx,
1117                                    const struct blk_mq_queue_data *bd)
1118 {
1119         struct request *rq = bd->rq;
1120         struct rnbd_clt_dev *dev = rq->rq_disk->private_data;
1121         struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1122         int err;
1123
1124         if (unlikely(dev->dev_state != DEV_STATE_MAPPED))
1125                 return BLK_STS_IOERR;
1126
1127         iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON,
1128                                       RTRS_PERMIT_NOWAIT);
1129         if (unlikely(!iu->permit)) {
1130                 rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY);
1131                 return BLK_STS_RESOURCE;
1132         }
1133
1134         blk_mq_start_request(rq);
1135         err = rnbd_client_xfer_request(dev, rq, iu);
1136         if (likely(err == 0))
1137                 return BLK_STS_OK;
1138         if (unlikely(err == -EAGAIN || err == -ENOMEM)) {
1139                 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1140                 rnbd_put_permit(dev->sess, iu->permit);
1141                 return BLK_STS_RESOURCE;
1142         }
1143
1144         rnbd_put_permit(dev->sess, iu->permit);
1145         return BLK_STS_IOERR;
1146 }
1147
1148 static int rnbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
1149                               unsigned int hctx_idx, unsigned int numa_node)
1150 {
1151         struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1152
1153         sg_init_table(iu->sglist, BMAX_SEGMENTS);
1154         return 0;
1155 }
1156
1157 static struct blk_mq_ops rnbd_mq_ops = {
1158         .queue_rq       = rnbd_queue_rq,
1159         .init_request   = rnbd_init_request,
1160         .complete       = rnbd_softirq_done_fn,
1161 };
1162
1163 static int setup_mq_tags(struct rnbd_clt_session *sess)
1164 {
1165         struct blk_mq_tag_set *tag_set = &sess->tag_set;
1166
1167         memset(tag_set, 0, sizeof(*tag_set));
1168         tag_set->ops            = &rnbd_mq_ops;
1169         tag_set->queue_depth    = sess->queue_depth;
1170         tag_set->numa_node              = NUMA_NO_NODE;
1171         tag_set->flags          = BLK_MQ_F_SHOULD_MERGE |
1172                                   BLK_MQ_F_TAG_QUEUE_SHARED;
1173         tag_set->cmd_size               = sizeof(struct rnbd_iu);
1174         tag_set->nr_hw_queues   = num_online_cpus();
1175
1176         return blk_mq_alloc_tag_set(tag_set);
1177 }
1178
1179 static struct rnbd_clt_session *
1180 find_and_get_or_create_sess(const char *sessname,
1181                             const struct rtrs_addr *paths,
1182                             size_t path_cnt, u16 port_nr)
1183 {
1184         struct rnbd_clt_session *sess;
1185         struct rtrs_attrs attrs;
1186         int err;
1187         bool first;
1188         struct rtrs_clt_ops rtrs_ops;
1189
1190         sess = find_or_create_sess(sessname, &first);
1191         if (sess == ERR_PTR(-ENOMEM))
1192                 return ERR_PTR(-ENOMEM);
1193         else if (!first)
1194                 return sess;
1195
1196         if (!path_cnt) {
1197                 pr_err("Session %s not found, and path parameter not given", sessname);
1198                 err = -ENXIO;
1199                 goto put_sess;
1200         }
1201
1202         rtrs_ops = (struct rtrs_clt_ops) {
1203                 .priv = sess,
1204                 .link_ev = rnbd_clt_link_ev,
1205         };
1206         /*
1207          * Nothing was found, establish rtrs connection and proceed further.
1208          */
1209         sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
1210                                    paths, path_cnt, port_nr,
1211                                    sizeof(struct rnbd_iu),
1212                                    RECONNECT_DELAY, BMAX_SEGMENTS,
1213                                    BLK_MAX_SEGMENT_SIZE,
1214                                    MAX_RECONNECTS);
1215         if (IS_ERR(sess->rtrs)) {
1216                 err = PTR_ERR(sess->rtrs);
1217                 goto wake_up_and_put;
1218         }
1219         rtrs_clt_query(sess->rtrs, &attrs);
1220         sess->max_io_size = attrs.max_io_size;
1221         sess->queue_depth = attrs.queue_depth;
1222
1223         err = setup_mq_tags(sess);
1224         if (err)
1225                 goto close_rtrs;
1226
1227         err = send_msg_sess_info(sess, WAIT);
1228         if (err)
1229                 goto close_rtrs;
1230
1231         wake_up_rtrs_waiters(sess);
1232
1233         return sess;
1234
1235 close_rtrs:
1236         close_rtrs(sess);
1237 put_sess:
1238         rnbd_clt_put_sess(sess);
1239
1240         return ERR_PTR(err);
1241
1242 wake_up_and_put:
1243         wake_up_rtrs_waiters(sess);
1244         goto put_sess;
1245 }
1246
1247 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev,
1248                                        struct rnbd_queue *q,
1249                                        struct blk_mq_hw_ctx *hctx)
1250 {
1251         INIT_LIST_HEAD(&q->requeue_list);
1252         q->dev  = dev;
1253         q->hctx = hctx;
1254 }
1255
1256 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev)
1257 {
1258         int i;
1259         struct blk_mq_hw_ctx *hctx;
1260         struct rnbd_queue *q;
1261
1262         queue_for_each_hw_ctx(dev->queue, hctx, i) {
1263                 q = &dev->hw_queues[i];
1264                 rnbd_init_hw_queue(dev, q, hctx);
1265                 hctx->driver_data = q;
1266         }
1267 }
1268
1269 static int setup_mq_dev(struct rnbd_clt_dev *dev)
1270 {
1271         dev->queue = blk_mq_init_queue(&dev->sess->tag_set);
1272         if (IS_ERR(dev->queue)) {
1273                 rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n",
1274                               PTR_ERR(dev->queue));
1275                 return PTR_ERR(dev->queue);
1276         }
1277         rnbd_init_mq_hw_queues(dev);
1278         return 0;
1279 }
1280
1281 static void setup_request_queue(struct rnbd_clt_dev *dev)
1282 {
1283         blk_queue_logical_block_size(dev->queue, dev->logical_block_size);
1284         blk_queue_physical_block_size(dev->queue, dev->physical_block_size);
1285         blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors);
1286         blk_queue_max_write_same_sectors(dev->queue,
1287                                          dev->max_write_same_sectors);
1288
1289         /*
1290          * we don't support discards to "discontiguous" segments
1291          * in on request
1292          */
1293         blk_queue_max_discard_segments(dev->queue, 1);
1294
1295         blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors);
1296         dev->queue->limits.discard_granularity  = dev->discard_granularity;
1297         dev->queue->limits.discard_alignment    = dev->discard_alignment;
1298         if (dev->max_discard_sectors)
1299                 blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue);
1300         if (dev->secure_discard)
1301                 blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue);
1302
1303         blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
1304         blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
1305         blk_queue_max_segments(dev->queue, dev->max_segments);
1306         blk_queue_io_opt(dev->queue, dev->sess->max_io_size);
1307         blk_queue_virt_boundary(dev->queue, SZ_4K - 1);
1308         blk_queue_write_cache(dev->queue, true, true);
1309         dev->queue->queuedata = dev;
1310 }
1311
1312 static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx)
1313 {
1314         dev->gd->major          = rnbd_client_major;
1315         dev->gd->first_minor    = idx << RNBD_PART_BITS;
1316         dev->gd->fops           = &rnbd_client_ops;
1317         dev->gd->queue          = dev->queue;
1318         dev->gd->private_data   = dev;
1319         snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d",
1320                  idx);
1321         pr_debug("disk_name=%s, capacity=%zu\n",
1322                  dev->gd->disk_name,
1323                  dev->nsectors * (dev->logical_block_size / SECTOR_SIZE)
1324                  );
1325
1326         set_capacity(dev->gd, dev->nsectors);
1327
1328         if (dev->access_mode == RNBD_ACCESS_RO) {
1329                 dev->read_only = true;
1330                 set_disk_ro(dev->gd, true);
1331         } else {
1332                 dev->read_only = false;
1333         }
1334
1335         if (!dev->rotational)
1336                 blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue);
1337 }
1338
1339 static int rnbd_client_setup_device(struct rnbd_clt_session *sess,
1340                                      struct rnbd_clt_dev *dev, int idx)
1341 {
1342         int err;
1343
1344         dev->size = dev->nsectors * dev->logical_block_size;
1345
1346         err = setup_mq_dev(dev);
1347         if (err)
1348                 return err;
1349
1350         setup_request_queue(dev);
1351
1352         dev->gd = alloc_disk_node(1 << RNBD_PART_BITS,  NUMA_NO_NODE);
1353         if (!dev->gd) {
1354                 blk_cleanup_queue(dev->queue);
1355                 return -ENOMEM;
1356         }
1357
1358         rnbd_clt_setup_gen_disk(dev, idx);
1359
1360         return 0;
1361 }
1362
1363 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess,
1364                                       enum rnbd_access_mode access_mode,
1365                                       const char *pathname)
1366 {
1367         struct rnbd_clt_dev *dev;
1368         int ret;
1369
1370         dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE);
1371         if (!dev)
1372                 return ERR_PTR(-ENOMEM);
1373
1374         dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues),
1375                                  GFP_KERNEL);
1376         if (!dev->hw_queues) {
1377                 ret = -ENOMEM;
1378                 goto out_alloc;
1379         }
1380
1381         mutex_lock(&ida_lock);
1382         ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS),
1383                              GFP_KERNEL);
1384         mutex_unlock(&ida_lock);
1385         if (ret < 0) {
1386                 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
1387                        pathname, sess->sessname, ret);
1388                 goto out_queues;
1389         }
1390
1391         dev->pathname = kzalloc(strlen(pathname) + 1, GFP_KERNEL);
1392         if (!dev->pathname) {
1393                 ret = -ENOMEM;
1394                 goto out_queues;
1395         }
1396         strlcpy(dev->pathname, pathname, strlen(pathname) + 1);
1397
1398         dev->clt_device_id      = ret;
1399         dev->sess               = sess;
1400         dev->access_mode        = access_mode;
1401         mutex_init(&dev->lock);
1402         refcount_set(&dev->refcount, 1);
1403         dev->dev_state = DEV_STATE_INIT;
1404
1405         /*
1406          * Here we called from sysfs entry, thus clt-sysfs is
1407          * responsible that session will not disappear.
1408          */
1409         WARN_ON(!rnbd_clt_get_sess(sess));
1410
1411         return dev;
1412
1413 out_queues:
1414         kfree(dev->hw_queues);
1415 out_alloc:
1416         kfree(dev);
1417         return ERR_PTR(ret);
1418 }
1419
1420 static bool __exists_dev(const char *pathname, const char *sessname)
1421 {
1422         struct rnbd_clt_session *sess;
1423         struct rnbd_clt_dev *dev;
1424         bool found = false;
1425
1426         list_for_each_entry(sess, &sess_list, list) {
1427                 if (sessname && strncmp(sess->sessname, sessname,
1428                                         sizeof(sess->sessname)))
1429                         continue;
1430                 mutex_lock(&sess->lock);
1431                 list_for_each_entry(dev, &sess->devs_list, list) {
1432                         if (strlen(dev->pathname) == strlen(pathname) &&
1433                             !strcmp(dev->pathname, pathname)) {
1434                                 found = true;
1435                                 break;
1436                         }
1437                 }
1438                 mutex_unlock(&sess->lock);
1439                 if (found)
1440                         break;
1441         }
1442
1443         return found;
1444 }
1445
1446 static bool exists_devpath(const char *pathname, const char *sessname)
1447 {
1448         bool found;
1449
1450         mutex_lock(&sess_lock);
1451         found = __exists_dev(pathname, sessname);
1452         mutex_unlock(&sess_lock);
1453
1454         return found;
1455 }
1456
1457 static bool insert_dev_if_not_exists_devpath(const char *pathname,
1458                                              struct rnbd_clt_session *sess,
1459                                              struct rnbd_clt_dev *dev)
1460 {
1461         bool found;
1462
1463         mutex_lock(&sess_lock);
1464         found = __exists_dev(pathname, sess->sessname);
1465         if (!found) {
1466                 mutex_lock(&sess->lock);
1467                 list_add_tail(&dev->list, &sess->devs_list);
1468                 mutex_unlock(&sess->lock);
1469         }
1470         mutex_unlock(&sess_lock);
1471
1472         return found;
1473 }
1474
1475 static void delete_dev(struct rnbd_clt_dev *dev)
1476 {
1477         struct rnbd_clt_session *sess = dev->sess;
1478
1479         mutex_lock(&sess->lock);
1480         list_del(&dev->list);
1481         mutex_unlock(&sess->lock);
1482 }
1483
1484 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname,
1485                                            struct rtrs_addr *paths,
1486                                            size_t path_cnt, u16 port_nr,
1487                                            const char *pathname,
1488                                            enum rnbd_access_mode access_mode)
1489 {
1490         struct rnbd_clt_session *sess;
1491         struct rnbd_clt_dev *dev;
1492         int ret;
1493
1494         if (unlikely(exists_devpath(pathname, sessname)))
1495                 return ERR_PTR(-EEXIST);
1496
1497         sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr);
1498         if (IS_ERR(sess))
1499                 return ERR_CAST(sess);
1500
1501         dev = init_dev(sess, access_mode, pathname);
1502         if (IS_ERR(dev)) {
1503                 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n",
1504                        pathname, sess->sessname, PTR_ERR(dev));
1505                 ret = PTR_ERR(dev);
1506                 goto put_sess;
1507         }
1508         if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) {
1509                 ret = -EEXIST;
1510                 goto put_dev;
1511         }
1512         ret = send_msg_open(dev, WAIT);
1513         if (ret) {
1514                 rnbd_clt_err(dev,
1515                               "map_device: failed, can't open remote device, err: %d\n",
1516                               ret);
1517                 goto del_dev;
1518         }
1519         mutex_lock(&dev->lock);
1520         pr_debug("Opened remote device: session=%s, path='%s'\n",
1521                  sess->sessname, pathname);
1522         ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id);
1523         if (ret) {
1524                 rnbd_clt_err(dev,
1525                               "map_device: Failed to configure device, err: %d\n",
1526                               ret);
1527                 mutex_unlock(&dev->lock);
1528                 goto send_close;
1529         }
1530
1531         rnbd_clt_info(dev,
1532                        "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d)\n",
1533                        dev->gd->disk_name, dev->nsectors,
1534                        dev->logical_block_size, dev->physical_block_size,
1535                        dev->max_write_same_sectors, dev->max_discard_sectors,
1536                        dev->discard_granularity, dev->discard_alignment,
1537                        dev->secure_discard, dev->max_segments,
1538                        dev->max_hw_sectors, dev->rotational);
1539
1540         mutex_unlock(&dev->lock);
1541
1542         add_disk(dev->gd);
1543         rnbd_clt_put_sess(sess);
1544
1545         return dev;
1546
1547 send_close:
1548         send_msg_close(dev, dev->device_id, WAIT);
1549 del_dev:
1550         delete_dev(dev);
1551 put_dev:
1552         rnbd_clt_put_dev(dev);
1553 put_sess:
1554         rnbd_clt_put_sess(sess);
1555
1556         return ERR_PTR(ret);
1557 }
1558
1559 static void destroy_gen_disk(struct rnbd_clt_dev *dev)
1560 {
1561         del_gendisk(dev->gd);
1562         blk_cleanup_queue(dev->queue);
1563         put_disk(dev->gd);
1564 }
1565
1566 static void destroy_sysfs(struct rnbd_clt_dev *dev,
1567                           const struct attribute *sysfs_self)
1568 {
1569         rnbd_clt_remove_dev_symlink(dev);
1570         if (dev->kobj.state_initialized) {
1571                 if (sysfs_self)
1572                         /* To avoid deadlock firstly remove itself */
1573                         sysfs_remove_file_self(&dev->kobj, sysfs_self);
1574                 kobject_del(&dev->kobj);
1575                 kobject_put(&dev->kobj);
1576         }
1577 }
1578
1579 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force,
1580                            const struct attribute *sysfs_self)
1581 {
1582         struct rnbd_clt_session *sess = dev->sess;
1583         int refcount, ret = 0;
1584         bool was_mapped;
1585
1586         mutex_lock(&dev->lock);
1587         if (dev->dev_state == DEV_STATE_UNMAPPED) {
1588                 rnbd_clt_info(dev, "Device is already being unmapped\n");
1589                 ret = -EALREADY;
1590                 goto err;
1591         }
1592         refcount = refcount_read(&dev->refcount);
1593         if (!force && refcount > 1) {
1594                 rnbd_clt_err(dev,
1595                               "Closing device failed, device is in use, (%d device users)\n",
1596                               refcount - 1);
1597                 ret = -EBUSY;
1598                 goto err;
1599         }
1600         was_mapped = (dev->dev_state == DEV_STATE_MAPPED);
1601         dev->dev_state = DEV_STATE_UNMAPPED;
1602         mutex_unlock(&dev->lock);
1603
1604         delete_dev(dev);
1605         destroy_sysfs(dev, sysfs_self);
1606         destroy_gen_disk(dev);
1607         if (was_mapped && sess->rtrs)
1608                 send_msg_close(dev, dev->device_id, WAIT);
1609
1610         rnbd_clt_info(dev, "Device is unmapped\n");
1611
1612         /* Likely last reference put */
1613         rnbd_clt_put_dev(dev);
1614
1615         /*
1616          * Here device and session can be vanished!
1617          */
1618
1619         return 0;
1620 err:
1621         mutex_unlock(&dev->lock);
1622
1623         return ret;
1624 }
1625
1626 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev)
1627 {
1628         int err;
1629
1630         mutex_lock(&dev->lock);
1631         if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED)
1632                 err = 0;
1633         else if (dev->dev_state == DEV_STATE_UNMAPPED)
1634                 err = -ENODEV;
1635         else if (dev->dev_state == DEV_STATE_MAPPED)
1636                 err = -EALREADY;
1637         else
1638                 err = -EBUSY;
1639         mutex_unlock(&dev->lock);
1640         if (!err) {
1641                 rnbd_clt_info(dev, "Remapping device.\n");
1642                 err = send_msg_open(dev, WAIT);
1643                 if (err)
1644                         rnbd_clt_err(dev, "remap_device: %d\n", err);
1645         }
1646
1647         return err;
1648 }
1649
1650 static void unmap_device_work(struct work_struct *work)
1651 {
1652         struct rnbd_clt_dev *dev;
1653
1654         dev = container_of(work, typeof(*dev), unmap_on_rmmod_work);
1655         rnbd_clt_unmap_device(dev, true, NULL);
1656 }
1657
1658 static void rnbd_destroy_sessions(void)
1659 {
1660         struct rnbd_clt_session *sess, *sn;
1661         struct rnbd_clt_dev *dev, *tn;
1662
1663         /* Firstly forbid access through sysfs interface */
1664         rnbd_clt_destroy_default_group();
1665         rnbd_clt_destroy_sysfs_files();
1666
1667         /*
1668          * Here at this point there is no any concurrent access to sessions
1669          * list and devices list:
1670          *   1. New session or device can'be be created - session sysfs files
1671          *      are removed.
1672          *   2. Device or session can't be removed - module reference is taken
1673          *      into account in unmap device sysfs callback.
1674          *   3. No IO requests inflight - each file open of block_dev increases
1675          *      module reference in get_disk().
1676          *
1677          * But still there can be user requests inflights, which are sent by
1678          * asynchronous send_msg_*() functions, thus before unmapping devices
1679          * RTRS session must be explicitly closed.
1680          */
1681
1682         list_for_each_entry_safe(sess, sn, &sess_list, list) {
1683                 WARN_ON(!rnbd_clt_get_sess(sess));
1684                 close_rtrs(sess);
1685                 list_for_each_entry_safe(dev, tn, &sess->devs_list, list) {
1686                         /*
1687                          * Here unmap happens in parallel for only one reason:
1688                          * blk_cleanup_queue() takes around half a second, so
1689                          * on huge amount of devices the whole module unload
1690                          * procedure takes minutes.
1691                          */
1692                         INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
1693                         queue_work(system_long_wq, &dev->unmap_on_rmmod_work);
1694                 }
1695                 rnbd_clt_put_sess(sess);
1696         }
1697         /* Wait for all scheduled unmap works */
1698         flush_workqueue(system_long_wq);
1699         WARN_ON(!list_empty(&sess_list));
1700 }
1701
1702 static int __init rnbd_client_init(void)
1703 {
1704         int err = 0;
1705
1706         BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4);
1707         BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36);
1708         BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36);
1709         BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264);
1710         BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8);
1711         BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56);
1712         rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd");
1713         if (rnbd_client_major <= 0) {
1714                 pr_err("Failed to load module, block device registration failed\n");
1715                 return -EBUSY;
1716         }
1717
1718         err = rnbd_clt_create_sysfs_files();
1719         if (err) {
1720                 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
1721                        err);
1722                 unregister_blkdev(rnbd_client_major, "rnbd");
1723         }
1724
1725         return err;
1726 }
1727
1728 static void __exit rnbd_client_exit(void)
1729 {
1730         rnbd_destroy_sessions();
1731         unregister_blkdev(rnbd_client_major, "rnbd");
1732         ida_destroy(&index_ida);
1733 }
1734
1735 module_init(rnbd_client_init);
1736 module_exit(rnbd_client_exit);