Merge tag 'seccomp-v5.11-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git/kees...
[linux-2.6-microblaze.git] / drivers / block / rnbd / rnbd-clt.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * RDMA Network Block Driver
4  *
5  * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6  * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7  * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8  */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14 #include <linux/blkdev.h>
15 #include <linux/hdreg.h>
16 #include <linux/scatterlist.h>
17 #include <linux/idr.h>
18
19 #include "rnbd-clt.h"
20
21 MODULE_DESCRIPTION("RDMA Network Block Device Client");
22 MODULE_LICENSE("GPL");
23
24 static int rnbd_client_major;
25 static DEFINE_IDA(index_ida);
26 static DEFINE_MUTEX(ida_lock);
27 static DEFINE_MUTEX(sess_lock);
28 static LIST_HEAD(sess_list);
29
30 /*
31  * Maximum number of partitions an instance can have.
32  * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
33  */
34 #define RNBD_PART_BITS          6
35
36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess)
37 {
38         return refcount_inc_not_zero(&sess->refcount);
39 }
40
41 static void free_sess(struct rnbd_clt_session *sess);
42
43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess)
44 {
45         might_sleep();
46
47         if (refcount_dec_and_test(&sess->refcount))
48                 free_sess(sess);
49 }
50
51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev)
52 {
53         might_sleep();
54
55         if (!refcount_dec_and_test(&dev->refcount))
56                 return;
57
58         mutex_lock(&ida_lock);
59         ida_simple_remove(&index_ida, dev->clt_device_id);
60         mutex_unlock(&ida_lock);
61         kfree(dev->hw_queues);
62         rnbd_clt_put_sess(dev->sess);
63         mutex_destroy(&dev->lock);
64         kfree(dev);
65 }
66
67 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev)
68 {
69         return refcount_inc_not_zero(&dev->refcount);
70 }
71
72 static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev,
73                                  const struct rnbd_msg_open_rsp *rsp)
74 {
75         struct rnbd_clt_session *sess = dev->sess;
76
77         if (!rsp->logical_block_size)
78                 return -EINVAL;
79
80         dev->device_id              = le32_to_cpu(rsp->device_id);
81         dev->nsectors               = le64_to_cpu(rsp->nsectors);
82         dev->logical_block_size     = le16_to_cpu(rsp->logical_block_size);
83         dev->physical_block_size    = le16_to_cpu(rsp->physical_block_size);
84         dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors);
85         dev->max_discard_sectors    = le32_to_cpu(rsp->max_discard_sectors);
86         dev->discard_granularity    = le32_to_cpu(rsp->discard_granularity);
87         dev->discard_alignment      = le32_to_cpu(rsp->discard_alignment);
88         dev->secure_discard         = le16_to_cpu(rsp->secure_discard);
89         dev->rotational             = rsp->rotational;
90
91         dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE;
92         dev->max_segments = BMAX_SEGMENTS;
93
94         return 0;
95 }
96
97 static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev,
98                                     size_t new_nsectors)
99 {
100         rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n",
101                        dev->nsectors, new_nsectors);
102         dev->nsectors = new_nsectors;
103         set_capacity(dev->gd, dev->nsectors);
104         revalidate_disk_size(dev->gd, true);
105         return 0;
106 }
107
108 static int process_msg_open_rsp(struct rnbd_clt_dev *dev,
109                                 struct rnbd_msg_open_rsp *rsp)
110 {
111         int err = 0;
112
113         mutex_lock(&dev->lock);
114         if (dev->dev_state == DEV_STATE_UNMAPPED) {
115                 rnbd_clt_info(dev,
116                                "Ignoring Open-Response message from server for  unmapped device\n");
117                 err = -ENOENT;
118                 goto out;
119         }
120         if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) {
121                 u64 nsectors = le64_to_cpu(rsp->nsectors);
122
123                 /*
124                  * If the device was remapped and the size changed in the
125                  * meantime we need to revalidate it
126                  */
127                 if (dev->nsectors != nsectors)
128                         rnbd_clt_change_capacity(dev, nsectors);
129                 rnbd_clt_info(dev, "Device online, device remapped successfully\n");
130         }
131         err = rnbd_clt_set_dev_attr(dev, rsp);
132         if (err)
133                 goto out;
134         dev->dev_state = DEV_STATE_MAPPED;
135
136 out:
137         mutex_unlock(&dev->lock);
138
139         return err;
140 }
141
142 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize)
143 {
144         int ret = 0;
145
146         mutex_lock(&dev->lock);
147         if (dev->dev_state != DEV_STATE_MAPPED) {
148                 pr_err("Failed to set new size of the device, device is not opened\n");
149                 ret = -ENOENT;
150                 goto out;
151         }
152         ret = rnbd_clt_change_capacity(dev, newsize);
153
154 out:
155         mutex_unlock(&dev->lock);
156
157         return ret;
158 }
159
160 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q)
161 {
162         if (WARN_ON(!q->hctx))
163                 return;
164
165         /* We can come here from interrupt, thus async=true */
166         blk_mq_run_hw_queue(q->hctx, true);
167 }
168
169 enum {
170         RNBD_DELAY_IFBUSY = -1,
171 };
172
173 /**
174  * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
175  * @sess:       Session to find a queue for
176  * @cpu:        Cpu to start the search from
177  *
178  * Description:
179  *     Each CPU has a list of HW queues, which needs to be rerun.  If a list
180  *     is not empty - it is marked with a bit.  This function finds first
181  *     set bit in a bitmap and returns corresponding CPU list.
182  */
183 static struct rnbd_cpu_qlist *
184 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
185 {
186         int bit;
187
188         /* Search from cpu to nr_cpu_ids */
189         bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu);
190         if (bit < nr_cpu_ids) {
191                 return per_cpu_ptr(sess->cpu_queues, bit);
192         } else if (cpu != 0) {
193                 /* Search from 0 to cpu */
194                 bit = find_next_bit(sess->cpu_queues_bm, cpu, 0);
195                 if (bit < cpu)
196                         return per_cpu_ptr(sess->cpu_queues, bit);
197         }
198
199         return NULL;
200 }
201
202 static inline int nxt_cpu(int cpu)
203 {
204         return (cpu + 1) % nr_cpu_ids;
205 }
206
207 /**
208  * rnbd_rerun_if_needed() - rerun next queue marked as stopped
209  * @sess:       Session to rerun a queue on
210  *
211  * Description:
212  *     Each CPU has it's own list of HW queues, which should be rerun.
213  *     Function finds such list with HW queues, takes a list lock, picks up
214  *     the first HW queue out of the list and requeues it.
215  *
216  * Return:
217  *     True if the queue was requeued, false otherwise.
218  *
219  * Context:
220  *     Does not matter.
221  */
222 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
223 {
224         struct rnbd_queue *q = NULL;
225         struct rnbd_cpu_qlist *cpu_q;
226         unsigned long flags;
227         int *cpup;
228
229         /*
230          * To keep fairness and not to let other queues starve we always
231          * try to wake up someone else in round-robin manner.  That of course
232          * increases latency but queues always have a chance to be executed.
233          */
234         cpup = get_cpu_ptr(sess->cpu_rr);
235         for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
236              cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) {
237                 if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
238                         continue;
239                 if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm)))
240                         goto unlock;
241                 q = list_first_entry_or_null(&cpu_q->requeue_list,
242                                              typeof(*q), requeue_list);
243                 if (WARN_ON(!q))
244                         goto clear_bit;
245                 list_del_init(&q->requeue_list);
246                 clear_bit_unlock(0, &q->in_list);
247
248                 if (list_empty(&cpu_q->requeue_list)) {
249                         /* Clear bit if nothing is left */
250 clear_bit:
251                         clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
252                 }
253 unlock:
254                 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
255
256                 if (q)
257                         break;
258         }
259
260         /**
261          * Saves the CPU that is going to be requeued on the per-cpu var. Just
262          * incrementing it doesn't work because rnbd_get_cpu_qlist() will
263          * always return the first CPU with something on the queue list when the
264          * value stored on the var is greater than the last CPU with something
265          * on the list.
266          */
267         if (cpu_q)
268                 *cpup = cpu_q->cpu;
269         put_cpu_var(sess->cpu_rr);
270
271         if (q)
272                 rnbd_clt_dev_requeue(q);
273
274         return q;
275 }
276
277 /**
278  * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
279  *                               session is idling (there are no requests
280  *                               in-flight).
281  * @sess:       Session to rerun the queues on
282  *
283  * Description:
284  *     This function tries to rerun all stopped queues if there are no
285  *     requests in-flight anymore.  This function tries to solve an obvious
286  *     problem, when number of tags < than number of queues (hctx), which
287  *     are stopped and put to sleep.  If last permit, which has been just put,
288  *     does not wake up all left queues (hctxs), IO requests hang forever.
289  *
290  *     That can happen when all number of permits, say N, have been exhausted
291  *     from one CPU, and we have many block devices per session, say M.
292  *     Each block device has it's own queue (hctx) for each CPU, so eventually
293  *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
294  *     If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
295  *
296  *     To avoid this hang last caller of rnbd_put_permit() (last caller is the
297  *     one who observes sess->busy == 0) must wake up all remaining queues.
298  *
299  * Context:
300  *     Does not matter.
301  */
302 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
303 {
304         bool requeued;
305
306         do {
307                 requeued = rnbd_rerun_if_needed(sess);
308         } while (atomic_read(&sess->busy) == 0 && requeued);
309 }
310
311 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess,
312                                              enum rtrs_clt_con_type con_type,
313                                              int wait)
314 {
315         struct rtrs_permit *permit;
316
317         permit = rtrs_clt_get_permit(sess->rtrs, con_type,
318                                       wait ? RTRS_PERMIT_WAIT :
319                                       RTRS_PERMIT_NOWAIT);
320         if (likely(permit))
321                 /* We have a subtle rare case here, when all permits can be
322                  * consumed before busy counter increased.  This is safe,
323                  * because loser will get NULL as a permit, observe 0 busy
324                  * counter and immediately restart the queue himself.
325                  */
326                 atomic_inc(&sess->busy);
327
328         return permit;
329 }
330
331 static void rnbd_put_permit(struct rnbd_clt_session *sess,
332                              struct rtrs_permit *permit)
333 {
334         rtrs_clt_put_permit(sess->rtrs, permit);
335         atomic_dec(&sess->busy);
336         /* Paired with rnbd_clt_dev_add_to_requeue().  Decrement first
337          * and then check queue bits.
338          */
339         smp_mb__after_atomic();
340         rnbd_rerun_all_if_idle(sess);
341 }
342
343 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess,
344                                      enum rtrs_clt_con_type con_type,
345                                      int wait)
346 {
347         struct rnbd_iu *iu;
348         struct rtrs_permit *permit;
349
350         permit = rnbd_get_permit(sess, con_type,
351                                   wait ? RTRS_PERMIT_WAIT :
352                                   RTRS_PERMIT_NOWAIT);
353         if (unlikely(!permit))
354                 return NULL;
355         iu = rtrs_permit_to_pdu(permit);
356         iu->permit = permit;
357         /*
358          * 1st reference is dropped after finishing sending a "user" message,
359          * 2nd reference is dropped after confirmation with the response is
360          * returned.
361          * 1st and 2nd can happen in any order, so the rnbd_iu should be
362          * released (rtrs_permit returned to ibbtrs) only leased after both
363          * are finished.
364          */
365         atomic_set(&iu->refcount, 2);
366         init_waitqueue_head(&iu->comp.wait);
367         iu->comp.errno = INT_MAX;
368
369         return iu;
370 }
371
372 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu)
373 {
374         if (atomic_dec_and_test(&iu->refcount))
375                 rnbd_put_permit(sess, iu->permit);
376 }
377
378 static void rnbd_softirq_done_fn(struct request *rq)
379 {
380         struct rnbd_clt_dev *dev        = rq->rq_disk->private_data;
381         struct rnbd_clt_session *sess   = dev->sess;
382         struct rnbd_iu *iu;
383
384         iu = blk_mq_rq_to_pdu(rq);
385         rnbd_put_permit(sess, iu->permit);
386         blk_mq_end_request(rq, errno_to_blk_status(iu->errno));
387 }
388
389 static void msg_io_conf(void *priv, int errno)
390 {
391         struct rnbd_iu *iu = priv;
392         struct rnbd_clt_dev *dev = iu->dev;
393         struct request *rq = iu->rq;
394         int rw = rq_data_dir(rq);
395
396         iu->errno = errno;
397
398         blk_mq_complete_request(rq);
399
400         if (errno)
401                 rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n",
402                                  rw == READ ? "read" : "write", errno);
403 }
404
405 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno)
406 {
407         iu->comp.errno = errno;
408         wake_up(&iu->comp.wait);
409 }
410
411 static void msg_conf(void *priv, int errno)
412 {
413         struct rnbd_iu *iu = priv;
414
415         iu->errno = errno;
416         schedule_work(&iu->work);
417 }
418
419 enum wait_type {
420         NO_WAIT = 0,
421         WAIT    = 1
422 };
423
424 static int send_usr_msg(struct rtrs_clt *rtrs, int dir,
425                         struct rnbd_iu *iu, struct kvec *vec,
426                         size_t len, struct scatterlist *sg, unsigned int sg_len,
427                         void (*conf)(struct work_struct *work),
428                         int *errno, enum wait_type wait)
429 {
430         int err;
431         struct rtrs_clt_req_ops req_ops;
432
433         INIT_WORK(&iu->work, conf);
434         req_ops = (struct rtrs_clt_req_ops) {
435                 .priv = iu,
436                 .conf_fn = msg_conf,
437         };
438         err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit,
439                                 vec, 1, len, sg, sg_len);
440         if (!err && wait) {
441                 wait_event(iu->comp.wait, iu->comp.errno != INT_MAX);
442                 *errno = iu->comp.errno;
443         } else {
444                 *errno = 0;
445         }
446
447         return err;
448 }
449
450 static void msg_close_conf(struct work_struct *work)
451 {
452         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
453         struct rnbd_clt_dev *dev = iu->dev;
454
455         wake_up_iu_comp(iu, iu->errno);
456         rnbd_put_iu(dev->sess, iu);
457         rnbd_clt_put_dev(dev);
458 }
459
460 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait)
461 {
462         struct rnbd_clt_session *sess = dev->sess;
463         struct rnbd_msg_close msg;
464         struct rnbd_iu *iu;
465         struct kvec vec = {
466                 .iov_base = &msg,
467                 .iov_len  = sizeof(msg)
468         };
469         int err, errno;
470
471         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
472         if (!iu)
473                 return -ENOMEM;
474
475         iu->buf = NULL;
476         iu->dev = dev;
477
478         sg_mark_end(&iu->sglist[0]);
479
480         msg.hdr.type    = cpu_to_le16(RNBD_MSG_CLOSE);
481         msg.device_id   = cpu_to_le32(device_id);
482
483         WARN_ON(!rnbd_clt_get_dev(dev));
484         err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0,
485                            msg_close_conf, &errno, wait);
486         if (err) {
487                 rnbd_clt_put_dev(dev);
488                 rnbd_put_iu(sess, iu);
489         } else {
490                 err = errno;
491         }
492
493         rnbd_put_iu(sess, iu);
494         return err;
495 }
496
497 static void msg_open_conf(struct work_struct *work)
498 {
499         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
500         struct rnbd_msg_open_rsp *rsp = iu->buf;
501         struct rnbd_clt_dev *dev = iu->dev;
502         int errno = iu->errno;
503
504         if (errno) {
505                 rnbd_clt_err(dev,
506                               "Opening failed, server responded: %d\n",
507                               errno);
508         } else {
509                 errno = process_msg_open_rsp(dev, rsp);
510                 if (errno) {
511                         u32 device_id = le32_to_cpu(rsp->device_id);
512                         /*
513                          * If server thinks its fine, but we fail to process
514                          * then be nice and send a close to server.
515                          */
516                         (void)send_msg_close(dev, device_id, NO_WAIT);
517                 }
518         }
519         kfree(rsp);
520         wake_up_iu_comp(iu, errno);
521         rnbd_put_iu(dev->sess, iu);
522         rnbd_clt_put_dev(dev);
523 }
524
525 static void msg_sess_info_conf(struct work_struct *work)
526 {
527         struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
528         struct rnbd_msg_sess_info_rsp *rsp = iu->buf;
529         struct rnbd_clt_session *sess = iu->sess;
530
531         if (!iu->errno)
532                 sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR);
533
534         kfree(rsp);
535         wake_up_iu_comp(iu, iu->errno);
536         rnbd_put_iu(sess, iu);
537         rnbd_clt_put_sess(sess);
538 }
539
540 static int send_msg_open(struct rnbd_clt_dev *dev, bool wait)
541 {
542         struct rnbd_clt_session *sess = dev->sess;
543         struct rnbd_msg_open_rsp *rsp;
544         struct rnbd_msg_open msg;
545         struct rnbd_iu *iu;
546         struct kvec vec = {
547                 .iov_base = &msg,
548                 .iov_len  = sizeof(msg)
549         };
550         int err, errno;
551
552         rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
553         if (!rsp)
554                 return -ENOMEM;
555
556         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
557         if (!iu) {
558                 kfree(rsp);
559                 return -ENOMEM;
560         }
561
562         iu->buf = rsp;
563         iu->dev = dev;
564
565         sg_init_one(iu->sglist, rsp, sizeof(*rsp));
566
567         msg.hdr.type    = cpu_to_le16(RNBD_MSG_OPEN);
568         msg.access_mode = dev->access_mode;
569         strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
570
571         WARN_ON(!rnbd_clt_get_dev(dev));
572         err = send_usr_msg(sess->rtrs, READ, iu,
573                            &vec, sizeof(*rsp), iu->sglist, 1,
574                            msg_open_conf, &errno, wait);
575         if (err) {
576                 rnbd_clt_put_dev(dev);
577                 rnbd_put_iu(sess, iu);
578                 kfree(rsp);
579         } else {
580                 err = errno;
581         }
582
583         rnbd_put_iu(sess, iu);
584         return err;
585 }
586
587 static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait)
588 {
589         struct rnbd_msg_sess_info_rsp *rsp;
590         struct rnbd_msg_sess_info msg;
591         struct rnbd_iu *iu;
592         struct kvec vec = {
593                 .iov_base = &msg,
594                 .iov_len  = sizeof(msg)
595         };
596         int err, errno;
597
598         rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
599         if (!rsp)
600                 return -ENOMEM;
601
602         iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
603         if (!iu) {
604                 kfree(rsp);
605                 return -ENOMEM;
606         }
607
608         iu->buf = rsp;
609         iu->sess = sess;
610
611         sg_init_one(iu->sglist, rsp, sizeof(*rsp));
612
613         msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO);
614         msg.ver      = RNBD_PROTO_VER_MAJOR;
615
616         if (!rnbd_clt_get_sess(sess)) {
617                 /*
618                  * That can happen only in one case, when RTRS has restablished
619                  * the connection and link_ev() is called, but session is almost
620                  * dead, last reference on session is put and caller is waiting
621                  * for RTRS to close everything.
622                  */
623                 err = -ENODEV;
624                 goto put_iu;
625         }
626         err = send_usr_msg(sess->rtrs, READ, iu,
627                            &vec, sizeof(*rsp), iu->sglist, 1,
628                            msg_sess_info_conf, &errno, wait);
629         if (err) {
630                 rnbd_clt_put_sess(sess);
631 put_iu:
632                 rnbd_put_iu(sess, iu);
633                 kfree(rsp);
634         } else {
635                 err = errno;
636         }
637
638         rnbd_put_iu(sess, iu);
639         return err;
640 }
641
642 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess)
643 {
644         struct rnbd_clt_dev *dev;
645
646         mutex_lock(&sess->lock);
647         list_for_each_entry(dev, &sess->devs_list, list) {
648                 rnbd_clt_err(dev, "Device disconnected.\n");
649
650                 mutex_lock(&dev->lock);
651                 if (dev->dev_state == DEV_STATE_MAPPED)
652                         dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED;
653                 mutex_unlock(&dev->lock);
654         }
655         mutex_unlock(&sess->lock);
656 }
657
658 static void remap_devs(struct rnbd_clt_session *sess)
659 {
660         struct rnbd_clt_dev *dev;
661         struct rtrs_attrs attrs;
662         int err;
663
664         /*
665          * Careful here: we are called from RTRS link event directly,
666          * thus we can't send any RTRS request and wait for response
667          * or RTRS will not be able to complete request with failure
668          * if something goes wrong (failing of outstanding requests
669          * happens exactly from the context where we are blocking now).
670          *
671          * So to avoid deadlocks each usr message sent from here must
672          * be asynchronous.
673          */
674
675         err = send_msg_sess_info(sess, NO_WAIT);
676         if (err) {
677                 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err);
678                 return;
679         }
680
681         rtrs_clt_query(sess->rtrs, &attrs);
682         mutex_lock(&sess->lock);
683         sess->max_io_size = attrs.max_io_size;
684
685         list_for_each_entry(dev, &sess->devs_list, list) {
686                 bool skip;
687
688                 mutex_lock(&dev->lock);
689                 skip = (dev->dev_state == DEV_STATE_INIT);
690                 mutex_unlock(&dev->lock);
691                 if (skip)
692                         /*
693                          * When device is establishing connection for the first
694                          * time - do not remap, it will be closed soon.
695                          */
696                         continue;
697
698                 rnbd_clt_info(dev, "session reconnected, remapping device\n");
699                 err = send_msg_open(dev, NO_WAIT);
700                 if (err) {
701                         rnbd_clt_err(dev, "send_msg_open(): %d\n", err);
702                         break;
703                 }
704         }
705         mutex_unlock(&sess->lock);
706 }
707
708 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
709 {
710         struct rnbd_clt_session *sess = priv;
711
712         switch (ev) {
713         case RTRS_CLT_LINK_EV_DISCONNECTED:
714                 set_dev_states_to_disconnected(sess);
715                 break;
716         case RTRS_CLT_LINK_EV_RECONNECTED:
717                 remap_devs(sess);
718                 break;
719         default:
720                 pr_err("Unknown session event received (%d), session: %s\n",
721                        ev, sess->sessname);
722         }
723 }
724
725 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues)
726 {
727         unsigned int cpu;
728         struct rnbd_cpu_qlist *cpu_q;
729
730         for_each_possible_cpu(cpu) {
731                 cpu_q = per_cpu_ptr(cpu_queues, cpu);
732
733                 cpu_q->cpu = cpu;
734                 INIT_LIST_HEAD(&cpu_q->requeue_list);
735                 spin_lock_init(&cpu_q->requeue_lock);
736         }
737 }
738
739 static void destroy_mq_tags(struct rnbd_clt_session *sess)
740 {
741         if (sess->tag_set.tags)
742                 blk_mq_free_tag_set(&sess->tag_set);
743 }
744
745 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess)
746 {
747         sess->rtrs_ready = true;
748         wake_up_all(&sess->rtrs_waitq);
749 }
750
751 static void close_rtrs(struct rnbd_clt_session *sess)
752 {
753         might_sleep();
754
755         if (!IS_ERR_OR_NULL(sess->rtrs)) {
756                 rtrs_clt_close(sess->rtrs);
757                 sess->rtrs = NULL;
758                 wake_up_rtrs_waiters(sess);
759         }
760 }
761
762 static void free_sess(struct rnbd_clt_session *sess)
763 {
764         WARN_ON(!list_empty(&sess->devs_list));
765
766         might_sleep();
767
768         close_rtrs(sess);
769         destroy_mq_tags(sess);
770         if (!list_empty(&sess->list)) {
771                 mutex_lock(&sess_lock);
772                 list_del(&sess->list);
773                 mutex_unlock(&sess_lock);
774         }
775         free_percpu(sess->cpu_queues);
776         free_percpu(sess->cpu_rr);
777         mutex_destroy(&sess->lock);
778         kfree(sess);
779 }
780
781 static struct rnbd_clt_session *alloc_sess(const char *sessname)
782 {
783         struct rnbd_clt_session *sess;
784         int err, cpu;
785
786         sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
787         if (!sess)
788                 return ERR_PTR(-ENOMEM);
789         strlcpy(sess->sessname, sessname, sizeof(sess->sessname));
790         atomic_set(&sess->busy, 0);
791         mutex_init(&sess->lock);
792         INIT_LIST_HEAD(&sess->devs_list);
793         INIT_LIST_HEAD(&sess->list);
794         bitmap_zero(sess->cpu_queues_bm, NR_CPUS);
795         init_waitqueue_head(&sess->rtrs_waitq);
796         refcount_set(&sess->refcount, 1);
797
798         sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist);
799         if (!sess->cpu_queues) {
800                 err = -ENOMEM;
801                 goto err;
802         }
803         rnbd_init_cpu_qlists(sess->cpu_queues);
804
805         /*
806          * That is simple percpu variable which stores cpu indeces, which are
807          * incremented on each access.  We need that for the sake of fairness
808          * to wake up queues in a round-robin manner.
809          */
810         sess->cpu_rr = alloc_percpu(int);
811         if (!sess->cpu_rr) {
812                 err = -ENOMEM;
813                 goto err;
814         }
815         for_each_possible_cpu(cpu)
816                 * per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
817
818         return sess;
819
820 err:
821         free_sess(sess);
822
823         return ERR_PTR(err);
824 }
825
826 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess)
827 {
828         wait_event(sess->rtrs_waitq, sess->rtrs_ready);
829         if (IS_ERR_OR_NULL(sess->rtrs))
830                 return -ECONNRESET;
831
832         return 0;
833 }
834
835 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess)
836         __releases(&sess_lock)
837         __acquires(&sess_lock)
838 {
839         DEFINE_WAIT(wait);
840
841         prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE);
842         if (IS_ERR_OR_NULL(sess->rtrs)) {
843                 finish_wait(&sess->rtrs_waitq, &wait);
844                 return;
845         }
846         mutex_unlock(&sess_lock);
847         /* loop in caller, see __find_and_get_sess().
848          * You can't leave mutex locked and call schedule(), you will catch a
849          * deadlock with a caller of free_sess(), which has just put the last
850          * reference and is about to take the sess_lock in order to delete
851          * the session from the list.
852          */
853         schedule();
854         mutex_lock(&sess_lock);
855 }
856
857 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname)
858         __releases(&sess_lock)
859         __acquires(&sess_lock)
860 {
861         struct rnbd_clt_session *sess, *sn;
862         int err;
863
864 again:
865         list_for_each_entry_safe(sess, sn, &sess_list, list) {
866                 if (strcmp(sessname, sess->sessname))
867                         continue;
868
869                 if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs))
870                         /*
871                          * No RTRS connection, session is dying.
872                          */
873                         continue;
874
875                 if (rnbd_clt_get_sess(sess)) {
876                         /*
877                          * Alive session is found, wait for RTRS connection.
878                          */
879                         mutex_unlock(&sess_lock);
880                         err = wait_for_rtrs_connection(sess);
881                         if (err)
882                                 rnbd_clt_put_sess(sess);
883                         mutex_lock(&sess_lock);
884
885                         if (err)
886                                 /* Session is dying, repeat the loop */
887                                 goto again;
888
889                         return sess;
890                 }
891                 /*
892                  * Ref is 0, session is dying, wait for RTRS disconnect
893                  * in order to avoid session names clashes.
894                  */
895                 wait_for_rtrs_disconnection(sess);
896                 /*
897                  * RTRS is disconnected and soon session will be freed,
898                  * so repeat a loop.
899                  */
900                 goto again;
901         }
902
903         return NULL;
904 }
905
906 static struct
907 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first)
908 {
909         struct rnbd_clt_session *sess = NULL;
910
911         mutex_lock(&sess_lock);
912         sess = __find_and_get_sess(sessname);
913         if (!sess) {
914                 sess = alloc_sess(sessname);
915                 if (IS_ERR(sess)) {
916                         mutex_unlock(&sess_lock);
917                         return sess;
918                 }
919                 list_add(&sess->list, &sess_list);
920                 *first = true;
921         } else
922                 *first = false;
923         mutex_unlock(&sess_lock);
924
925         return sess;
926 }
927
928 static int rnbd_client_open(struct block_device *block_device, fmode_t mode)
929 {
930         struct rnbd_clt_dev *dev = block_device->bd_disk->private_data;
931
932         if (dev->read_only && (mode & FMODE_WRITE))
933                 return -EPERM;
934
935         if (dev->dev_state == DEV_STATE_UNMAPPED ||
936             !rnbd_clt_get_dev(dev))
937                 return -EIO;
938
939         return 0;
940 }
941
942 static void rnbd_client_release(struct gendisk *gen, fmode_t mode)
943 {
944         struct rnbd_clt_dev *dev = gen->private_data;
945
946         rnbd_clt_put_dev(dev);
947 }
948
949 static int rnbd_client_getgeo(struct block_device *block_device,
950                               struct hd_geometry *geo)
951 {
952         u64 size;
953         struct rnbd_clt_dev *dev;
954
955         dev = block_device->bd_disk->private_data;
956         size = dev->size * (dev->logical_block_size / SECTOR_SIZE);
957         geo->cylinders  = size >> 6;    /* size/64 */
958         geo->heads      = 4;
959         geo->sectors    = 16;
960         geo->start      = 0;
961
962         return 0;
963 }
964
965 static const struct block_device_operations rnbd_client_ops = {
966         .owner          = THIS_MODULE,
967         .open           = rnbd_client_open,
968         .release        = rnbd_client_release,
969         .getgeo         = rnbd_client_getgeo
970 };
971
972 /* The amount of data that belongs to an I/O and the amount of data that
973  * should be read or written to the disk (bi_size) can differ.
974  *
975  * E.g. When WRITE_SAME is used, only a small amount of data is
976  * transferred that is then written repeatedly over a lot of sectors.
977  *
978  * Get the size of data to be transferred via RTRS by summing up the size
979  * of the scather-gather list entries.
980  */
981 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
982 {
983         struct scatterlist *sg;
984         size_t tsize = 0;
985         int i;
986
987         for_each_sg(sglist, sg, len, i)
988                 tsize += sg->length;
989         return tsize;
990 }
991
992 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev,
993                                      struct request *rq,
994                                      struct rnbd_iu *iu)
995 {
996         struct rtrs_clt *rtrs = dev->sess->rtrs;
997         struct rtrs_permit *permit = iu->permit;
998         struct rnbd_msg_io msg;
999         struct rtrs_clt_req_ops req_ops;
1000         unsigned int sg_cnt = 0;
1001         struct kvec vec;
1002         size_t size;
1003         int err;
1004
1005         iu->rq          = rq;
1006         iu->dev         = dev;
1007         msg.sector      = cpu_to_le64(blk_rq_pos(rq));
1008         msg.bi_size     = cpu_to_le32(blk_rq_bytes(rq));
1009         msg.rw          = cpu_to_le32(rq_to_rnbd_flags(rq));
1010         msg.prio        = cpu_to_le16(req_get_ioprio(rq));
1011
1012         /*
1013          * We only support discards with single segment for now.
1014          * See queue limits.
1015          */
1016         if (req_op(rq) != REQ_OP_DISCARD)
1017                 sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sglist);
1018
1019         if (sg_cnt == 0)
1020                 /* Do not forget to mark the end */
1021                 sg_mark_end(&iu->sglist[0]);
1022
1023         msg.hdr.type    = cpu_to_le16(RNBD_MSG_IO);
1024         msg.device_id   = cpu_to_le32(dev->device_id);
1025
1026         vec = (struct kvec) {
1027                 .iov_base = &msg,
1028                 .iov_len  = sizeof(msg)
1029         };
1030         size = rnbd_clt_get_sg_size(iu->sglist, sg_cnt);
1031         req_ops = (struct rtrs_clt_req_ops) {
1032                 .priv = iu,
1033                 .conf_fn = msg_io_conf,
1034         };
1035         err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit,
1036                                &vec, 1, size, iu->sglist, sg_cnt);
1037         if (unlikely(err)) {
1038                 rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n",
1039                                  err);
1040                 return err;
1041         }
1042
1043         return 0;
1044 }
1045
1046 /**
1047  * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
1048  * @dev:        Device to be checked
1049  * @q:          Queue to be added to the requeue list if required
1050  *
1051  * Description:
1052  *     If session is busy, that means someone will requeue us when resources
1053  *     are freed.  If session is not doing anything - device is not added to
1054  *     the list and @false is returned.
1055  */
1056 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev,
1057                                                 struct rnbd_queue *q)
1058 {
1059         struct rnbd_clt_session *sess = dev->sess;
1060         struct rnbd_cpu_qlist *cpu_q;
1061         unsigned long flags;
1062         bool added = true;
1063         bool need_set;
1064
1065         cpu_q = get_cpu_ptr(sess->cpu_queues);
1066         spin_lock_irqsave(&cpu_q->requeue_lock, flags);
1067
1068         if (likely(!test_and_set_bit_lock(0, &q->in_list))) {
1069                 if (WARN_ON(!list_empty(&q->requeue_list)))
1070                         goto unlock;
1071
1072                 need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm);
1073                 if (need_set) {
1074                         set_bit(cpu_q->cpu, sess->cpu_queues_bm);
1075                         /* Paired with rnbd_put_permit(). Set a bit first
1076                          * and then observe the busy counter.
1077                          */
1078                         smp_mb__before_atomic();
1079                 }
1080                 if (likely(atomic_read(&sess->busy))) {
1081                         list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
1082                 } else {
1083                         /* Very unlikely, but possible: busy counter was
1084                          * observed as zero.  Drop all bits and return
1085                          * false to restart the queue by ourselves.
1086                          */
1087                         if (need_set)
1088                                 clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
1089                         clear_bit_unlock(0, &q->in_list);
1090                         added = false;
1091                 }
1092         }
1093 unlock:
1094         spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
1095         put_cpu_ptr(sess->cpu_queues);
1096
1097         return added;
1098 }
1099
1100 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev,
1101                                         struct blk_mq_hw_ctx *hctx,
1102                                         int delay)
1103 {
1104         struct rnbd_queue *q = hctx->driver_data;
1105
1106         if (delay != RNBD_DELAY_IFBUSY)
1107                 blk_mq_delay_run_hw_queue(hctx, delay);
1108         else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q)))
1109                 /*
1110                  * If session is not busy we have to restart
1111                  * the queue ourselves.
1112                  */
1113                 blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
1114 }
1115
1116 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx,
1117                                    const struct blk_mq_queue_data *bd)
1118 {
1119         struct request *rq = bd->rq;
1120         struct rnbd_clt_dev *dev = rq->rq_disk->private_data;
1121         struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1122         int err;
1123
1124         if (unlikely(dev->dev_state != DEV_STATE_MAPPED))
1125                 return BLK_STS_IOERR;
1126
1127         iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON,
1128                                       RTRS_PERMIT_NOWAIT);
1129         if (unlikely(!iu->permit)) {
1130                 rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY);
1131                 return BLK_STS_RESOURCE;
1132         }
1133
1134         blk_mq_start_request(rq);
1135         err = rnbd_client_xfer_request(dev, rq, iu);
1136         if (likely(err == 0))
1137                 return BLK_STS_OK;
1138         if (unlikely(err == -EAGAIN || err == -ENOMEM)) {
1139                 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1140                 rnbd_put_permit(dev->sess, iu->permit);
1141                 return BLK_STS_RESOURCE;
1142         }
1143
1144         rnbd_put_permit(dev->sess, iu->permit);
1145         return BLK_STS_IOERR;
1146 }
1147
1148 static int rnbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
1149                               unsigned int hctx_idx, unsigned int numa_node)
1150 {
1151         struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1152
1153         sg_init_table(iu->sglist, BMAX_SEGMENTS);
1154         return 0;
1155 }
1156
1157 static struct blk_mq_ops rnbd_mq_ops = {
1158         .queue_rq       = rnbd_queue_rq,
1159         .init_request   = rnbd_init_request,
1160         .complete       = rnbd_softirq_done_fn,
1161 };
1162
1163 static int setup_mq_tags(struct rnbd_clt_session *sess)
1164 {
1165         struct blk_mq_tag_set *tag_set = &sess->tag_set;
1166
1167         memset(tag_set, 0, sizeof(*tag_set));
1168         tag_set->ops            = &rnbd_mq_ops;
1169         tag_set->queue_depth    = sess->queue_depth;
1170         tag_set->numa_node              = NUMA_NO_NODE;
1171         tag_set->flags          = BLK_MQ_F_SHOULD_MERGE |
1172                                   BLK_MQ_F_TAG_QUEUE_SHARED;
1173         tag_set->cmd_size               = sizeof(struct rnbd_iu);
1174         tag_set->nr_hw_queues   = num_online_cpus();
1175
1176         return blk_mq_alloc_tag_set(tag_set);
1177 }
1178
1179 static struct rnbd_clt_session *
1180 find_and_get_or_create_sess(const char *sessname,
1181                             const struct rtrs_addr *paths,
1182                             size_t path_cnt, u16 port_nr)
1183 {
1184         struct rnbd_clt_session *sess;
1185         struct rtrs_attrs attrs;
1186         int err;
1187         bool first;
1188         struct rtrs_clt_ops rtrs_ops;
1189
1190         sess = find_or_create_sess(sessname, &first);
1191         if (sess == ERR_PTR(-ENOMEM))
1192                 return ERR_PTR(-ENOMEM);
1193         else if (!first)
1194                 return sess;
1195
1196         rtrs_ops = (struct rtrs_clt_ops) {
1197                 .priv = sess,
1198                 .link_ev = rnbd_clt_link_ev,
1199         };
1200         /*
1201          * Nothing was found, establish rtrs connection and proceed further.
1202          */
1203         sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
1204                                    paths, path_cnt, port_nr,
1205                                    sizeof(struct rnbd_iu),
1206                                    RECONNECT_DELAY, BMAX_SEGMENTS,
1207                                    BLK_MAX_SEGMENT_SIZE,
1208                                    MAX_RECONNECTS);
1209         if (IS_ERR(sess->rtrs)) {
1210                 err = PTR_ERR(sess->rtrs);
1211                 goto wake_up_and_put;
1212         }
1213         rtrs_clt_query(sess->rtrs, &attrs);
1214         sess->max_io_size = attrs.max_io_size;
1215         sess->queue_depth = attrs.queue_depth;
1216
1217         err = setup_mq_tags(sess);
1218         if (err)
1219                 goto close_rtrs;
1220
1221         err = send_msg_sess_info(sess, WAIT);
1222         if (err)
1223                 goto close_rtrs;
1224
1225         wake_up_rtrs_waiters(sess);
1226
1227         return sess;
1228
1229 close_rtrs:
1230         close_rtrs(sess);
1231 put_sess:
1232         rnbd_clt_put_sess(sess);
1233
1234         return ERR_PTR(err);
1235
1236 wake_up_and_put:
1237         wake_up_rtrs_waiters(sess);
1238         goto put_sess;
1239 }
1240
1241 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev,
1242                                        struct rnbd_queue *q,
1243                                        struct blk_mq_hw_ctx *hctx)
1244 {
1245         INIT_LIST_HEAD(&q->requeue_list);
1246         q->dev  = dev;
1247         q->hctx = hctx;
1248 }
1249
1250 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev)
1251 {
1252         int i;
1253         struct blk_mq_hw_ctx *hctx;
1254         struct rnbd_queue *q;
1255
1256         queue_for_each_hw_ctx(dev->queue, hctx, i) {
1257                 q = &dev->hw_queues[i];
1258                 rnbd_init_hw_queue(dev, q, hctx);
1259                 hctx->driver_data = q;
1260         }
1261 }
1262
1263 static int setup_mq_dev(struct rnbd_clt_dev *dev)
1264 {
1265         dev->queue = blk_mq_init_queue(&dev->sess->tag_set);
1266         if (IS_ERR(dev->queue)) {
1267                 rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n",
1268                               PTR_ERR(dev->queue));
1269                 return PTR_ERR(dev->queue);
1270         }
1271         rnbd_init_mq_hw_queues(dev);
1272         return 0;
1273 }
1274
1275 static void setup_request_queue(struct rnbd_clt_dev *dev)
1276 {
1277         blk_queue_logical_block_size(dev->queue, dev->logical_block_size);
1278         blk_queue_physical_block_size(dev->queue, dev->physical_block_size);
1279         blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors);
1280         blk_queue_max_write_same_sectors(dev->queue,
1281                                          dev->max_write_same_sectors);
1282
1283         /*
1284          * we don't support discards to "discontiguous" segments
1285          * in on request
1286          */
1287         blk_queue_max_discard_segments(dev->queue, 1);
1288
1289         blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors);
1290         dev->queue->limits.discard_granularity  = dev->discard_granularity;
1291         dev->queue->limits.discard_alignment    = dev->discard_alignment;
1292         if (dev->max_discard_sectors)
1293                 blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue);
1294         if (dev->secure_discard)
1295                 blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue);
1296
1297         blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
1298         blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
1299         blk_queue_max_segments(dev->queue, dev->max_segments);
1300         blk_queue_io_opt(dev->queue, dev->sess->max_io_size);
1301         blk_queue_virt_boundary(dev->queue, SZ_4K - 1);
1302         blk_queue_write_cache(dev->queue, true, true);
1303         dev->queue->queuedata = dev;
1304 }
1305
1306 static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx)
1307 {
1308         dev->gd->major          = rnbd_client_major;
1309         dev->gd->first_minor    = idx << RNBD_PART_BITS;
1310         dev->gd->fops           = &rnbd_client_ops;
1311         dev->gd->queue          = dev->queue;
1312         dev->gd->private_data   = dev;
1313         snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d",
1314                  idx);
1315         pr_debug("disk_name=%s, capacity=%zu\n",
1316                  dev->gd->disk_name,
1317                  dev->nsectors * (dev->logical_block_size / SECTOR_SIZE)
1318                  );
1319
1320         set_capacity(dev->gd, dev->nsectors);
1321
1322         if (dev->access_mode == RNBD_ACCESS_RO) {
1323                 dev->read_only = true;
1324                 set_disk_ro(dev->gd, true);
1325         } else {
1326                 dev->read_only = false;
1327         }
1328
1329         if (!dev->rotational)
1330                 blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue);
1331 }
1332
1333 static int rnbd_client_setup_device(struct rnbd_clt_session *sess,
1334                                      struct rnbd_clt_dev *dev, int idx)
1335 {
1336         int err;
1337
1338         dev->size = dev->nsectors * dev->logical_block_size;
1339
1340         err = setup_mq_dev(dev);
1341         if (err)
1342                 return err;
1343
1344         setup_request_queue(dev);
1345
1346         dev->gd = alloc_disk_node(1 << RNBD_PART_BITS,  NUMA_NO_NODE);
1347         if (!dev->gd) {
1348                 blk_cleanup_queue(dev->queue);
1349                 return -ENOMEM;
1350         }
1351
1352         rnbd_clt_setup_gen_disk(dev, idx);
1353
1354         return 0;
1355 }
1356
1357 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess,
1358                                       enum rnbd_access_mode access_mode,
1359                                       const char *pathname)
1360 {
1361         struct rnbd_clt_dev *dev;
1362         int ret;
1363
1364         dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE);
1365         if (!dev)
1366                 return ERR_PTR(-ENOMEM);
1367
1368         dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues),
1369                                  GFP_KERNEL);
1370         if (!dev->hw_queues) {
1371                 ret = -ENOMEM;
1372                 goto out_alloc;
1373         }
1374
1375         mutex_lock(&ida_lock);
1376         ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS),
1377                              GFP_KERNEL);
1378         mutex_unlock(&ida_lock);
1379         if (ret < 0) {
1380                 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
1381                        pathname, sess->sessname, ret);
1382                 goto out_queues;
1383         }
1384         dev->clt_device_id      = ret;
1385         dev->sess               = sess;
1386         dev->access_mode        = access_mode;
1387         strlcpy(dev->pathname, pathname, sizeof(dev->pathname));
1388         mutex_init(&dev->lock);
1389         refcount_set(&dev->refcount, 1);
1390         dev->dev_state = DEV_STATE_INIT;
1391
1392         /*
1393          * Here we called from sysfs entry, thus clt-sysfs is
1394          * responsible that session will not disappear.
1395          */
1396         WARN_ON(!rnbd_clt_get_sess(sess));
1397
1398         return dev;
1399
1400 out_queues:
1401         kfree(dev->hw_queues);
1402 out_alloc:
1403         kfree(dev);
1404         return ERR_PTR(ret);
1405 }
1406
1407 static bool __exists_dev(const char *pathname)
1408 {
1409         struct rnbd_clt_session *sess;
1410         struct rnbd_clt_dev *dev;
1411         bool found = false;
1412
1413         list_for_each_entry(sess, &sess_list, list) {
1414                 mutex_lock(&sess->lock);
1415                 list_for_each_entry(dev, &sess->devs_list, list) {
1416                         if (!strncmp(dev->pathname, pathname,
1417                                      sizeof(dev->pathname))) {
1418                                 found = true;
1419                                 break;
1420                         }
1421                 }
1422                 mutex_unlock(&sess->lock);
1423                 if (found)
1424                         break;
1425         }
1426
1427         return found;
1428 }
1429
1430 static bool exists_devpath(const char *pathname)
1431 {
1432         bool found;
1433
1434         mutex_lock(&sess_lock);
1435         found = __exists_dev(pathname);
1436         mutex_unlock(&sess_lock);
1437
1438         return found;
1439 }
1440
1441 static bool insert_dev_if_not_exists_devpath(const char *pathname,
1442                                              struct rnbd_clt_session *sess,
1443                                              struct rnbd_clt_dev *dev)
1444 {
1445         bool found;
1446
1447         mutex_lock(&sess_lock);
1448         found = __exists_dev(pathname);
1449         if (!found) {
1450                 mutex_lock(&sess->lock);
1451                 list_add_tail(&dev->list, &sess->devs_list);
1452                 mutex_unlock(&sess->lock);
1453         }
1454         mutex_unlock(&sess_lock);
1455
1456         return found;
1457 }
1458
1459 static void delete_dev(struct rnbd_clt_dev *dev)
1460 {
1461         struct rnbd_clt_session *sess = dev->sess;
1462
1463         mutex_lock(&sess->lock);
1464         list_del(&dev->list);
1465         mutex_unlock(&sess->lock);
1466 }
1467
1468 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname,
1469                                            struct rtrs_addr *paths,
1470                                            size_t path_cnt, u16 port_nr,
1471                                            const char *pathname,
1472                                            enum rnbd_access_mode access_mode)
1473 {
1474         struct rnbd_clt_session *sess;
1475         struct rnbd_clt_dev *dev;
1476         int ret;
1477
1478         if (exists_devpath(pathname))
1479                 return ERR_PTR(-EEXIST);
1480
1481         sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr);
1482         if (IS_ERR(sess))
1483                 return ERR_CAST(sess);
1484
1485         dev = init_dev(sess, access_mode, pathname);
1486         if (IS_ERR(dev)) {
1487                 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n",
1488                        pathname, sess->sessname, PTR_ERR(dev));
1489                 ret = PTR_ERR(dev);
1490                 goto put_sess;
1491         }
1492         if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) {
1493                 ret = -EEXIST;
1494                 goto put_dev;
1495         }
1496         ret = send_msg_open(dev, WAIT);
1497         if (ret) {
1498                 rnbd_clt_err(dev,
1499                               "map_device: failed, can't open remote device, err: %d\n",
1500                               ret);
1501                 goto del_dev;
1502         }
1503         mutex_lock(&dev->lock);
1504         pr_debug("Opened remote device: session=%s, path='%s'\n",
1505                  sess->sessname, pathname);
1506         ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id);
1507         if (ret) {
1508                 rnbd_clt_err(dev,
1509                               "map_device: Failed to configure device, err: %d\n",
1510                               ret);
1511                 mutex_unlock(&dev->lock);
1512                 goto send_close;
1513         }
1514
1515         rnbd_clt_info(dev,
1516                        "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d)\n",
1517                        dev->gd->disk_name, dev->nsectors,
1518                        dev->logical_block_size, dev->physical_block_size,
1519                        dev->max_write_same_sectors, dev->max_discard_sectors,
1520                        dev->discard_granularity, dev->discard_alignment,
1521                        dev->secure_discard, dev->max_segments,
1522                        dev->max_hw_sectors, dev->rotational);
1523
1524         mutex_unlock(&dev->lock);
1525
1526         add_disk(dev->gd);
1527         rnbd_clt_put_sess(sess);
1528
1529         return dev;
1530
1531 send_close:
1532         send_msg_close(dev, dev->device_id, WAIT);
1533 del_dev:
1534         delete_dev(dev);
1535 put_dev:
1536         rnbd_clt_put_dev(dev);
1537 put_sess:
1538         rnbd_clt_put_sess(sess);
1539
1540         return ERR_PTR(ret);
1541 }
1542
1543 static void destroy_gen_disk(struct rnbd_clt_dev *dev)
1544 {
1545         del_gendisk(dev->gd);
1546         blk_cleanup_queue(dev->queue);
1547         put_disk(dev->gd);
1548 }
1549
1550 static void destroy_sysfs(struct rnbd_clt_dev *dev,
1551                           const struct attribute *sysfs_self)
1552 {
1553         rnbd_clt_remove_dev_symlink(dev);
1554         if (dev->kobj.state_initialized) {
1555                 if (sysfs_self)
1556                         /* To avoid deadlock firstly remove itself */
1557                         sysfs_remove_file_self(&dev->kobj, sysfs_self);
1558                 kobject_del(&dev->kobj);
1559                 kobject_put(&dev->kobj);
1560         }
1561 }
1562
1563 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force,
1564                            const struct attribute *sysfs_self)
1565 {
1566         struct rnbd_clt_session *sess = dev->sess;
1567         int refcount, ret = 0;
1568         bool was_mapped;
1569
1570         mutex_lock(&dev->lock);
1571         if (dev->dev_state == DEV_STATE_UNMAPPED) {
1572                 rnbd_clt_info(dev, "Device is already being unmapped\n");
1573                 ret = -EALREADY;
1574                 goto err;
1575         }
1576         refcount = refcount_read(&dev->refcount);
1577         if (!force && refcount > 1) {
1578                 rnbd_clt_err(dev,
1579                               "Closing device failed, device is in use, (%d device users)\n",
1580                               refcount - 1);
1581                 ret = -EBUSY;
1582                 goto err;
1583         }
1584         was_mapped = (dev->dev_state == DEV_STATE_MAPPED);
1585         dev->dev_state = DEV_STATE_UNMAPPED;
1586         mutex_unlock(&dev->lock);
1587
1588         delete_dev(dev);
1589         destroy_sysfs(dev, sysfs_self);
1590         destroy_gen_disk(dev);
1591         if (was_mapped && sess->rtrs)
1592                 send_msg_close(dev, dev->device_id, WAIT);
1593
1594         rnbd_clt_info(dev, "Device is unmapped\n");
1595
1596         /* Likely last reference put */
1597         rnbd_clt_put_dev(dev);
1598
1599         /*
1600          * Here device and session can be vanished!
1601          */
1602
1603         return 0;
1604 err:
1605         mutex_unlock(&dev->lock);
1606
1607         return ret;
1608 }
1609
1610 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev)
1611 {
1612         int err;
1613
1614         mutex_lock(&dev->lock);
1615         if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED)
1616                 err = 0;
1617         else if (dev->dev_state == DEV_STATE_UNMAPPED)
1618                 err = -ENODEV;
1619         else if (dev->dev_state == DEV_STATE_MAPPED)
1620                 err = -EALREADY;
1621         else
1622                 err = -EBUSY;
1623         mutex_unlock(&dev->lock);
1624         if (!err) {
1625                 rnbd_clt_info(dev, "Remapping device.\n");
1626                 err = send_msg_open(dev, WAIT);
1627                 if (err)
1628                         rnbd_clt_err(dev, "remap_device: %d\n", err);
1629         }
1630
1631         return err;
1632 }
1633
1634 static void unmap_device_work(struct work_struct *work)
1635 {
1636         struct rnbd_clt_dev *dev;
1637
1638         dev = container_of(work, typeof(*dev), unmap_on_rmmod_work);
1639         rnbd_clt_unmap_device(dev, true, NULL);
1640 }
1641
1642 static void rnbd_destroy_sessions(void)
1643 {
1644         struct rnbd_clt_session *sess, *sn;
1645         struct rnbd_clt_dev *dev, *tn;
1646
1647         /* Firstly forbid access through sysfs interface */
1648         rnbd_clt_destroy_default_group();
1649         rnbd_clt_destroy_sysfs_files();
1650
1651         /*
1652          * Here at this point there is no any concurrent access to sessions
1653          * list and devices list:
1654          *   1. New session or device can'be be created - session sysfs files
1655          *      are removed.
1656          *   2. Device or session can't be removed - module reference is taken
1657          *      into account in unmap device sysfs callback.
1658          *   3. No IO requests inflight - each file open of block_dev increases
1659          *      module reference in get_disk().
1660          *
1661          * But still there can be user requests inflights, which are sent by
1662          * asynchronous send_msg_*() functions, thus before unmapping devices
1663          * RTRS session must be explicitly closed.
1664          */
1665
1666         list_for_each_entry_safe(sess, sn, &sess_list, list) {
1667                 WARN_ON(!rnbd_clt_get_sess(sess));
1668                 close_rtrs(sess);
1669                 list_for_each_entry_safe(dev, tn, &sess->devs_list, list) {
1670                         /*
1671                          * Here unmap happens in parallel for only one reason:
1672                          * blk_cleanup_queue() takes around half a second, so
1673                          * on huge amount of devices the whole module unload
1674                          * procedure takes minutes.
1675                          */
1676                         INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
1677                         queue_work(system_long_wq, &dev->unmap_on_rmmod_work);
1678                 }
1679                 rnbd_clt_put_sess(sess);
1680         }
1681         /* Wait for all scheduled unmap works */
1682         flush_workqueue(system_long_wq);
1683         WARN_ON(!list_empty(&sess_list));
1684 }
1685
1686 static int __init rnbd_client_init(void)
1687 {
1688         int err = 0;
1689
1690         BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4);
1691         BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36);
1692         BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36);
1693         BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264);
1694         BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8);
1695         BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56);
1696         rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd");
1697         if (rnbd_client_major <= 0) {
1698                 pr_err("Failed to load module, block device registration failed\n");
1699                 return -EBUSY;
1700         }
1701
1702         err = rnbd_clt_create_sysfs_files();
1703         if (err) {
1704                 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
1705                        err);
1706                 unregister_blkdev(rnbd_client_major, "rnbd");
1707         }
1708
1709         return err;
1710 }
1711
1712 static void __exit rnbd_client_exit(void)
1713 {
1714         rnbd_destroy_sessions();
1715         unregister_blkdev(rnbd_client_major, "rnbd");
1716         ida_destroy(&index_ida);
1717 }
1718
1719 module_init(rnbd_client_init);
1720 module_exit(rnbd_client_exit);