Merge tag 'block-5.16-2022-01-07' of git://git.kernel.dk/linux-block
[linux-2.6-microblaze.git] / drivers / infiniband / ulp / rtrs / rtrs-clt.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * RDMA Transport Layer
4  *
5  * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6  * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7  * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8  */
9
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13 #include <linux/module.h>
14 #include <linux/rculist.h>
15 #include <linux/random.h>
16
17 #include "rtrs-clt.h"
18 #include "rtrs-log.h"
19
20 #define RTRS_CONNECT_TIMEOUT_MS 30000
21 /*
22  * Wait a bit before trying to reconnect after a failure
23  * in order to give server time to finish clean up which
24  * leads to "false positives" failed reconnect attempts
25  */
26 #define RTRS_RECONNECT_BACKOFF 1000
27 /*
28  * Wait for additional random time between 0 and 8 seconds
29  * before starting to reconnect to avoid clients reconnecting
30  * all at once in case of a major network outage
31  */
32 #define RTRS_RECONNECT_SEED 8
33
34 #define FIRST_CONN 0x01
35 /* limit to 128 * 4k = 512k max IO */
36 #define RTRS_MAX_SEGMENTS          128
37
38 MODULE_DESCRIPTION("RDMA Transport Client");
39 MODULE_LICENSE("GPL");
40
41 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops;
42 static struct rtrs_rdma_dev_pd dev_pd = {
43         .ops = &dev_pd_ops
44 };
45
46 static struct workqueue_struct *rtrs_wq;
47 static struct class *rtrs_clt_dev_class;
48
49 static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt)
50 {
51         struct rtrs_clt_sess *sess;
52         bool connected = false;
53
54         rcu_read_lock();
55         list_for_each_entry_rcu(sess, &clt->paths_list, s.entry)
56                 connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED;
57         rcu_read_unlock();
58
59         return connected;
60 }
61
62 static struct rtrs_permit *
63 __rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type)
64 {
65         size_t max_depth = clt->queue_depth;
66         struct rtrs_permit *permit;
67         int bit;
68
69         /*
70          * Adapted from null_blk get_tag(). Callers from different cpus may
71          * grab the same bit, since find_first_zero_bit is not atomic.
72          * But then the test_and_set_bit_lock will fail for all the
73          * callers but one, so that they will loop again.
74          * This way an explicit spinlock is not required.
75          */
76         do {
77                 bit = find_first_zero_bit(clt->permits_map, max_depth);
78                 if (bit >= max_depth)
79                         return NULL;
80         } while (test_and_set_bit_lock(bit, clt->permits_map));
81
82         permit = get_permit(clt, bit);
83         WARN_ON(permit->mem_id != bit);
84         permit->cpu_id = raw_smp_processor_id();
85         permit->con_type = con_type;
86
87         return permit;
88 }
89
90 static inline void __rtrs_put_permit(struct rtrs_clt *clt,
91                                       struct rtrs_permit *permit)
92 {
93         clear_bit_unlock(permit->mem_id, clt->permits_map);
94 }
95
96 /**
97  * rtrs_clt_get_permit() - allocates permit for future RDMA operation
98  * @clt:        Current session
99  * @con_type:   Type of connection to use with the permit
100  * @can_wait:   Wait type
101  *
102  * Description:
103  *    Allocates permit for the following RDMA operation.  Permit is used
104  *    to preallocate all resources and to propagate memory pressure
105  *    up earlier.
106  *
107  * Context:
108  *    Can sleep if @wait == RTRS_PERMIT_WAIT
109  */
110 struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt,
111                                           enum rtrs_clt_con_type con_type,
112                                           enum wait_type can_wait)
113 {
114         struct rtrs_permit *permit;
115         DEFINE_WAIT(wait);
116
117         permit = __rtrs_get_permit(clt, con_type);
118         if (permit || !can_wait)
119                 return permit;
120
121         do {
122                 prepare_to_wait(&clt->permits_wait, &wait,
123                                 TASK_UNINTERRUPTIBLE);
124                 permit = __rtrs_get_permit(clt, con_type);
125                 if (permit)
126                         break;
127
128                 io_schedule();
129         } while (1);
130
131         finish_wait(&clt->permits_wait, &wait);
132
133         return permit;
134 }
135 EXPORT_SYMBOL(rtrs_clt_get_permit);
136
137 /**
138  * rtrs_clt_put_permit() - puts allocated permit
139  * @clt:        Current session
140  * @permit:     Permit to be freed
141  *
142  * Context:
143  *    Does not matter
144  */
145 void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit)
146 {
147         if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map)))
148                 return;
149
150         __rtrs_put_permit(clt, permit);
151
152         /*
153          * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list
154          * before calling schedule(). So if rtrs_clt_get_permit() is sleeping
155          * it must have added itself to &clt->permits_wait before
156          * __rtrs_put_permit() finished.
157          * Hence it is safe to guard wake_up() with a waitqueue_active() test.
158          */
159         if (waitqueue_active(&clt->permits_wait))
160                 wake_up(&clt->permits_wait);
161 }
162 EXPORT_SYMBOL(rtrs_clt_put_permit);
163
164 /**
165  * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit
166  * @sess: client session pointer
167  * @permit: permit for the allocation of the RDMA buffer
168  * Note:
169  *     IO connection starts from 1.
170  *     0 connection is for user messages.
171  */
172 static
173 struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess,
174                                             struct rtrs_permit *permit)
175 {
176         int id = 0;
177
178         if (permit->con_type == RTRS_IO_CON)
179                 id = (permit->cpu_id % (sess->s.irq_con_num - 1)) + 1;
180
181         return to_clt_con(sess->s.con[id]);
182 }
183
184 /**
185  * rtrs_clt_change_state() - change the session state through session state
186  * machine.
187  *
188  * @sess: client session to change the state of.
189  * @new_state: state to change to.
190  *
191  * returns true if sess's state is changed to new state, otherwise return false.
192  *
193  * Locks:
194  * state_wq lock must be hold.
195  */
196 static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess,
197                                      enum rtrs_clt_state new_state)
198 {
199         enum rtrs_clt_state old_state;
200         bool changed = false;
201
202         lockdep_assert_held(&sess->state_wq.lock);
203
204         old_state = sess->state;
205         switch (new_state) {
206         case RTRS_CLT_CONNECTING:
207                 switch (old_state) {
208                 case RTRS_CLT_RECONNECTING:
209                         changed = true;
210                         fallthrough;
211                 default:
212                         break;
213                 }
214                 break;
215         case RTRS_CLT_RECONNECTING:
216                 switch (old_state) {
217                 case RTRS_CLT_CONNECTED:
218                 case RTRS_CLT_CONNECTING_ERR:
219                 case RTRS_CLT_CLOSED:
220                         changed = true;
221                         fallthrough;
222                 default:
223                         break;
224                 }
225                 break;
226         case RTRS_CLT_CONNECTED:
227                 switch (old_state) {
228                 case RTRS_CLT_CONNECTING:
229                         changed = true;
230                         fallthrough;
231                 default:
232                         break;
233                 }
234                 break;
235         case RTRS_CLT_CONNECTING_ERR:
236                 switch (old_state) {
237                 case RTRS_CLT_CONNECTING:
238                         changed = true;
239                         fallthrough;
240                 default:
241                         break;
242                 }
243                 break;
244         case RTRS_CLT_CLOSING:
245                 switch (old_state) {
246                 case RTRS_CLT_CONNECTING:
247                 case RTRS_CLT_CONNECTING_ERR:
248                 case RTRS_CLT_RECONNECTING:
249                 case RTRS_CLT_CONNECTED:
250                         changed = true;
251                         fallthrough;
252                 default:
253                         break;
254                 }
255                 break;
256         case RTRS_CLT_CLOSED:
257                 switch (old_state) {
258                 case RTRS_CLT_CLOSING:
259                         changed = true;
260                         fallthrough;
261                 default:
262                         break;
263                 }
264                 break;
265         case RTRS_CLT_DEAD:
266                 switch (old_state) {
267                 case RTRS_CLT_CLOSED:
268                         changed = true;
269                         fallthrough;
270                 default:
271                         break;
272                 }
273                 break;
274         default:
275                 break;
276         }
277         if (changed) {
278                 sess->state = new_state;
279                 wake_up_locked(&sess->state_wq);
280         }
281
282         return changed;
283 }
284
285 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess,
286                                            enum rtrs_clt_state old_state,
287                                            enum rtrs_clt_state new_state)
288 {
289         bool changed = false;
290
291         spin_lock_irq(&sess->state_wq.lock);
292         if (sess->state == old_state)
293                 changed = rtrs_clt_change_state(sess, new_state);
294         spin_unlock_irq(&sess->state_wq.lock);
295
296         return changed;
297 }
298
299 static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con)
300 {
301         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
302
303         if (rtrs_clt_change_state_from_to(sess,
304                                            RTRS_CLT_CONNECTED,
305                                            RTRS_CLT_RECONNECTING)) {
306                 struct rtrs_clt *clt = sess->clt;
307                 unsigned int delay_ms;
308
309                 /*
310                  * Normal scenario, reconnect if we were successfully connected
311                  */
312                 delay_ms = clt->reconnect_delay_sec * 1000;
313                 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
314                                    msecs_to_jiffies(delay_ms +
315                                                     prandom_u32() % RTRS_RECONNECT_SEED));
316         } else {
317                 /*
318                  * Error can happen just on establishing new connection,
319                  * so notify waiter with error state, waiter is responsible
320                  * for cleaning the rest and reconnect if needed.
321                  */
322                 rtrs_clt_change_state_from_to(sess,
323                                                RTRS_CLT_CONNECTING,
324                                                RTRS_CLT_CONNECTING_ERR);
325         }
326 }
327
328 static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc)
329 {
330         struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
331
332         if (wc->status != IB_WC_SUCCESS) {
333                 rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n",
334                           ib_wc_status_msg(wc->status));
335                 rtrs_rdma_error_recovery(con);
336         }
337 }
338
339 static struct ib_cqe fast_reg_cqe = {
340         .done = rtrs_clt_fast_reg_done
341 };
342
343 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
344                               bool notify, bool can_wait);
345
346 static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
347 {
348         struct rtrs_clt_io_req *req =
349                 container_of(wc->wr_cqe, typeof(*req), inv_cqe);
350         struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
351
352         if (wc->status != IB_WC_SUCCESS) {
353                 rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n",
354                           ib_wc_status_msg(wc->status));
355                 rtrs_rdma_error_recovery(con);
356         }
357         req->need_inv = false;
358         if (req->need_inv_comp)
359                 complete(&req->inv_comp);
360         else
361                 /* Complete request from INV callback */
362                 complete_rdma_req(req, req->inv_errno, true, false);
363 }
364
365 static int rtrs_inv_rkey(struct rtrs_clt_io_req *req)
366 {
367         struct rtrs_clt_con *con = req->con;
368         struct ib_send_wr wr = {
369                 .opcode             = IB_WR_LOCAL_INV,
370                 .wr_cqe             = &req->inv_cqe,
371                 .send_flags         = IB_SEND_SIGNALED,
372                 .ex.invalidate_rkey = req->mr->rkey,
373         };
374         req->inv_cqe.done = rtrs_clt_inv_rkey_done;
375
376         return ib_post_send(con->c.qp, &wr, NULL);
377 }
378
379 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
380                               bool notify, bool can_wait)
381 {
382         struct rtrs_clt_con *con = req->con;
383         struct rtrs_clt_sess *sess;
384         int err;
385
386         if (WARN_ON(!req->in_use))
387                 return;
388         if (WARN_ON(!req->con))
389                 return;
390         sess = to_clt_sess(con->c.sess);
391
392         if (req->sg_cnt) {
393                 if (req->dir == DMA_FROM_DEVICE && req->need_inv) {
394                         /*
395                          * We are here to invalidate read requests
396                          * ourselves.  In normal scenario server should
397                          * send INV for all read requests, but
398                          * we are here, thus two things could happen:
399                          *
400                          *    1.  this is failover, when errno != 0
401                          *        and can_wait == 1,
402                          *
403                          *    2.  something totally bad happened and
404                          *        server forgot to send INV, so we
405                          *        should do that ourselves.
406                          */
407
408                         if (can_wait) {
409                                 req->need_inv_comp = true;
410                         } else {
411                                 /* This should be IO path, so always notify */
412                                 WARN_ON(!notify);
413                                 /* Save errno for INV callback */
414                                 req->inv_errno = errno;
415                         }
416
417                         refcount_inc(&req->ref);
418                         err = rtrs_inv_rkey(req);
419                         if (err) {
420                                 rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n",
421                                           req->mr->rkey, err);
422                         } else if (can_wait) {
423                                 wait_for_completion(&req->inv_comp);
424                         } else {
425                                 /*
426                                  * Something went wrong, so request will be
427                                  * completed from INV callback.
428                                  */
429                                 WARN_ON_ONCE(1);
430
431                                 return;
432                         }
433                         if (!refcount_dec_and_test(&req->ref))
434                                 return;
435                 }
436                 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
437                                 req->sg_cnt, req->dir);
438         }
439         if (!refcount_dec_and_test(&req->ref))
440                 return;
441         if (req->mp_policy == MP_POLICY_MIN_INFLIGHT)
442                 atomic_dec(&sess->stats->inflight);
443
444         req->in_use = false;
445         req->con = NULL;
446
447         if (errno) {
448                 rtrs_err_rl(con->c.sess, "IO request failed: error=%d path=%s [%s:%u] notify=%d\n",
449                             errno, kobject_name(&sess->kobj), sess->hca_name,
450                             sess->hca_port, notify);
451         }
452
453         if (notify)
454                 req->conf(req->priv, errno);
455 }
456
457 static int rtrs_post_send_rdma(struct rtrs_clt_con *con,
458                                 struct rtrs_clt_io_req *req,
459                                 struct rtrs_rbuf *rbuf, u32 off,
460                                 u32 imm, struct ib_send_wr *wr)
461 {
462         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
463         enum ib_send_flags flags;
464         struct ib_sge sge;
465
466         if (!req->sg_size) {
467                 rtrs_wrn(con->c.sess,
468                          "Doing RDMA Write failed, no data supplied\n");
469                 return -EINVAL;
470         }
471
472         /* user data and user message in the first list element */
473         sge.addr   = req->iu->dma_addr;
474         sge.length = req->sg_size;
475         sge.lkey   = sess->s.dev->ib_pd->local_dma_lkey;
476
477         /*
478          * From time to time we have to post signalled sends,
479          * or send queue will fill up and only QP reset can help.
480          */
481         flags = atomic_inc_return(&con->c.wr_cnt) % sess->s.signal_interval ?
482                         0 : IB_SEND_SIGNALED;
483
484         ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
485                                       req->sg_size, DMA_TO_DEVICE);
486
487         return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1,
488                                             rbuf->rkey, rbuf->addr + off,
489                                             imm, flags, wr, NULL);
490 }
491
492 static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id,
493                            s16 errno, bool w_inval)
494 {
495         struct rtrs_clt_io_req *req;
496
497         if (WARN_ON(msg_id >= sess->queue_depth))
498                 return;
499
500         req = &sess->reqs[msg_id];
501         /* Drop need_inv if server responded with send with invalidation */
502         req->need_inv &= !w_inval;
503         complete_rdma_req(req, errno, true, false);
504 }
505
506 static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc)
507 {
508         struct rtrs_iu *iu;
509         int err;
510         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
511
512         WARN_ON((sess->flags & RTRS_MSG_NEW_RKEY_F) == 0);
513         iu = container_of(wc->wr_cqe, struct rtrs_iu,
514                           cqe);
515         err = rtrs_iu_post_recv(&con->c, iu);
516         if (err) {
517                 rtrs_err(con->c.sess, "post iu failed %d\n", err);
518                 rtrs_rdma_error_recovery(con);
519         }
520 }
521
522 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc)
523 {
524         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
525         struct rtrs_msg_rkey_rsp *msg;
526         u32 imm_type, imm_payload;
527         bool w_inval = false;
528         struct rtrs_iu *iu;
529         u32 buf_id;
530         int err;
531
532         WARN_ON((sess->flags & RTRS_MSG_NEW_RKEY_F) == 0);
533
534         iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
535
536         if (wc->byte_len < sizeof(*msg)) {
537                 rtrs_err(con->c.sess, "rkey response is malformed: size %d\n",
538                           wc->byte_len);
539                 goto out;
540         }
541         ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
542                                    iu->size, DMA_FROM_DEVICE);
543         msg = iu->buf;
544         if (le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP) {
545                 rtrs_err(sess->clt, "rkey response is malformed: type %d\n",
546                           le16_to_cpu(msg->type));
547                 goto out;
548         }
549         buf_id = le16_to_cpu(msg->buf_id);
550         if (WARN_ON(buf_id >= sess->queue_depth))
551                 goto out;
552
553         rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload);
554         if (imm_type == RTRS_IO_RSP_IMM ||
555             imm_type == RTRS_IO_RSP_W_INV_IMM) {
556                 u32 msg_id;
557
558                 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
559                 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
560
561                 if (WARN_ON(buf_id != msg_id))
562                         goto out;
563                 sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey);
564                 process_io_rsp(sess, msg_id, err, w_inval);
565         }
566         ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr,
567                                       iu->size, DMA_FROM_DEVICE);
568         return rtrs_clt_recv_done(con, wc);
569 out:
570         rtrs_rdma_error_recovery(con);
571 }
572
573 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
574
575 static struct ib_cqe io_comp_cqe = {
576         .done = rtrs_clt_rdma_done
577 };
578
579 /*
580  * Post x2 empty WRs: first is for this RDMA with IMM,
581  * second is for RECV with INV, which happened earlier.
582  */
583 static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe)
584 {
585         struct ib_recv_wr wr_arr[2], *wr;
586         int i;
587
588         memset(wr_arr, 0, sizeof(wr_arr));
589         for (i = 0; i < ARRAY_SIZE(wr_arr); i++) {
590                 wr = &wr_arr[i];
591                 wr->wr_cqe  = cqe;
592                 if (i)
593                         /* Chain backwards */
594                         wr->next = &wr_arr[i - 1];
595         }
596
597         return ib_post_recv(con->qp, wr, NULL);
598 }
599
600 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
601 {
602         struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
603         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
604         u32 imm_type, imm_payload;
605         bool w_inval = false;
606         int err;
607
608         if (wc->status != IB_WC_SUCCESS) {
609                 if (wc->status != IB_WC_WR_FLUSH_ERR) {
610                         rtrs_err(sess->clt, "RDMA failed: %s\n",
611                                   ib_wc_status_msg(wc->status));
612                         rtrs_rdma_error_recovery(con);
613                 }
614                 return;
615         }
616         rtrs_clt_update_wc_stats(con);
617
618         switch (wc->opcode) {
619         case IB_WC_RECV_RDMA_WITH_IMM:
620                 /*
621                  * post_recv() RDMA write completions of IO reqs (read/write)
622                  * and hb
623                  */
624                 if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done))
625                         return;
626                 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
627                                &imm_type, &imm_payload);
628                 if (imm_type == RTRS_IO_RSP_IMM ||
629                     imm_type == RTRS_IO_RSP_W_INV_IMM) {
630                         u32 msg_id;
631
632                         w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
633                         rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
634
635                         process_io_rsp(sess, msg_id, err, w_inval);
636                 } else if (imm_type == RTRS_HB_MSG_IMM) {
637                         WARN_ON(con->c.cid);
638                         rtrs_send_hb_ack(&sess->s);
639                         if (sess->flags & RTRS_MSG_NEW_RKEY_F)
640                                 return  rtrs_clt_recv_done(con, wc);
641                 } else if (imm_type == RTRS_HB_ACK_IMM) {
642                         WARN_ON(con->c.cid);
643                         sess->s.hb_missed_cnt = 0;
644                         sess->s.hb_cur_latency =
645                                 ktime_sub(ktime_get(), sess->s.hb_last_sent);
646                         if (sess->flags & RTRS_MSG_NEW_RKEY_F)
647                                 return  rtrs_clt_recv_done(con, wc);
648                 } else {
649                         rtrs_wrn(con->c.sess, "Unknown IMM type %u\n",
650                                   imm_type);
651                 }
652                 if (w_inval)
653                         /*
654                          * Post x2 empty WRs: first is for this RDMA with IMM,
655                          * second is for RECV with INV, which happened earlier.
656                          */
657                         err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe);
658                 else
659                         err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
660                 if (err) {
661                         rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n",
662                                   err);
663                         rtrs_rdma_error_recovery(con);
664                 }
665                 break;
666         case IB_WC_RECV:
667                 /*
668                  * Key invalidations from server side
669                  */
670                 WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE ||
671                           wc->wc_flags & IB_WC_WITH_IMM));
672                 WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done);
673                 if (sess->flags & RTRS_MSG_NEW_RKEY_F) {
674                         if (wc->wc_flags & IB_WC_WITH_INVALIDATE)
675                                 return  rtrs_clt_recv_done(con, wc);
676
677                         return  rtrs_clt_rkey_rsp_done(con, wc);
678                 }
679                 break;
680         case IB_WC_RDMA_WRITE:
681                 /*
682                  * post_send() RDMA write completions of IO reqs (read/write)
683                  * and hb.
684                  */
685                 break;
686
687         default:
688                 rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode);
689                 return;
690         }
691 }
692
693 static int post_recv_io(struct rtrs_clt_con *con, size_t q_size)
694 {
695         int err, i;
696         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
697
698         for (i = 0; i < q_size; i++) {
699                 if (sess->flags & RTRS_MSG_NEW_RKEY_F) {
700                         struct rtrs_iu *iu = &con->rsp_ius[i];
701
702                         err = rtrs_iu_post_recv(&con->c, iu);
703                 } else {
704                         err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
705                 }
706                 if (err)
707                         return err;
708         }
709
710         return 0;
711 }
712
713 static int post_recv_sess(struct rtrs_clt_sess *sess)
714 {
715         size_t q_size = 0;
716         int err, cid;
717
718         for (cid = 0; cid < sess->s.con_num; cid++) {
719                 if (cid == 0)
720                         q_size = SERVICE_CON_QUEUE_DEPTH;
721                 else
722                         q_size = sess->queue_depth;
723
724                 /*
725                  * x2 for RDMA read responses + FR key invalidations,
726                  * RDMA writes do not require any FR registrations.
727                  */
728                 q_size *= 2;
729
730                 err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size);
731                 if (err) {
732                         rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err);
733                         return err;
734                 }
735         }
736
737         return 0;
738 }
739
740 struct path_it {
741         int i;
742         struct list_head skip_list;
743         struct rtrs_clt *clt;
744         struct rtrs_clt_sess *(*next_path)(struct path_it *it);
745 };
746
747 /**
748  * list_next_or_null_rr_rcu - get next list element in round-robin fashion.
749  * @head:       the head for the list.
750  * @ptr:        the list head to take the next element from.
751  * @type:       the type of the struct this is embedded in.
752  * @memb:       the name of the list_head within the struct.
753  *
754  * Next element returned in round-robin fashion, i.e. head will be skipped,
755  * but if list is observed as empty, NULL will be returned.
756  *
757  * This primitive may safely run concurrently with the _rcu list-mutation
758  * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock().
759  */
760 #define list_next_or_null_rr_rcu(head, ptr, type, memb) \
761 ({ \
762         list_next_or_null_rcu(head, ptr, type, memb) ?: \
763                 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \
764                                       type, memb); \
765 })
766
767 /**
768  * get_next_path_rr() - Returns path in round-robin fashion.
769  * @it: the path pointer
770  *
771  * Related to @MP_POLICY_RR
772  *
773  * Locks:
774  *    rcu_read_lock() must be hold.
775  */
776 static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it)
777 {
778         struct rtrs_clt_sess __rcu **ppcpu_path;
779         struct rtrs_clt_sess *path;
780         struct rtrs_clt *clt;
781
782         clt = it->clt;
783
784         /*
785          * Here we use two RCU objects: @paths_list and @pcpu_path
786          * pointer.  See rtrs_clt_remove_path_from_arr() for details
787          * how that is handled.
788          */
789
790         ppcpu_path = this_cpu_ptr(clt->pcpu_path);
791         path = rcu_dereference(*ppcpu_path);
792         if (!path)
793                 path = list_first_or_null_rcu(&clt->paths_list,
794                                               typeof(*path), s.entry);
795         else
796                 path = list_next_or_null_rr_rcu(&clt->paths_list,
797                                                 &path->s.entry,
798                                                 typeof(*path),
799                                                 s.entry);
800         rcu_assign_pointer(*ppcpu_path, path);
801
802         return path;
803 }
804
805 /**
806  * get_next_path_min_inflight() - Returns path with minimal inflight count.
807  * @it: the path pointer
808  *
809  * Related to @MP_POLICY_MIN_INFLIGHT
810  *
811  * Locks:
812  *    rcu_read_lock() must be hold.
813  */
814 static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it)
815 {
816         struct rtrs_clt_sess *min_path = NULL;
817         struct rtrs_clt *clt = it->clt;
818         struct rtrs_clt_sess *sess;
819         int min_inflight = INT_MAX;
820         int inflight;
821
822         list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
823                 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
824                         continue;
825
826                 if (!list_empty(raw_cpu_ptr(sess->mp_skip_entry)))
827                         continue;
828
829                 inflight = atomic_read(&sess->stats->inflight);
830
831                 if (inflight < min_inflight) {
832                         min_inflight = inflight;
833                         min_path = sess;
834                 }
835         }
836
837         /*
838          * add the path to the skip list, so that next time we can get
839          * a different one
840          */
841         if (min_path)
842                 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
843
844         return min_path;
845 }
846
847 /**
848  * get_next_path_min_latency() - Returns path with minimal latency.
849  * @it: the path pointer
850  *
851  * Return: a path with the lowest latency or NULL if all paths are tried
852  *
853  * Locks:
854  *    rcu_read_lock() must be hold.
855  *
856  * Related to @MP_POLICY_MIN_LATENCY
857  *
858  * This DOES skip an already-tried path.
859  * There is a skip-list to skip a path if the path has tried but failed.
860  * It will try the minimum latency path and then the second minimum latency
861  * path and so on. Finally it will return NULL if all paths are tried.
862  * Therefore the caller MUST check the returned
863  * path is NULL and trigger the IO error.
864  */
865 static struct rtrs_clt_sess *get_next_path_min_latency(struct path_it *it)
866 {
867         struct rtrs_clt_sess *min_path = NULL;
868         struct rtrs_clt *clt = it->clt;
869         struct rtrs_clt_sess *sess;
870         ktime_t min_latency = INT_MAX;
871         ktime_t latency;
872
873         list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
874                 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
875                         continue;
876
877                 if (!list_empty(raw_cpu_ptr(sess->mp_skip_entry)))
878                         continue;
879
880                 latency = sess->s.hb_cur_latency;
881
882                 if (latency < min_latency) {
883                         min_latency = latency;
884                         min_path = sess;
885                 }
886         }
887
888         /*
889          * add the path to the skip list, so that next time we can get
890          * a different one
891          */
892         if (min_path)
893                 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
894
895         return min_path;
896 }
897
898 static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt)
899 {
900         INIT_LIST_HEAD(&it->skip_list);
901         it->clt = clt;
902         it->i = 0;
903
904         if (clt->mp_policy == MP_POLICY_RR)
905                 it->next_path = get_next_path_rr;
906         else if (clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
907                 it->next_path = get_next_path_min_inflight;
908         else
909                 it->next_path = get_next_path_min_latency;
910 }
911
912 static inline void path_it_deinit(struct path_it *it)
913 {
914         struct list_head *skip, *tmp;
915         /*
916          * The skip_list is used only for the MIN_INFLIGHT policy.
917          * We need to remove paths from it, so that next IO can insert
918          * paths (->mp_skip_entry) into a skip_list again.
919          */
920         list_for_each_safe(skip, tmp, &it->skip_list)
921                 list_del_init(skip);
922 }
923
924 /**
925  * rtrs_clt_init_req() - Initialize an rtrs_clt_io_req holding information
926  * about an inflight IO.
927  * The user buffer holding user control message (not data) is copied into
928  * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will
929  * also hold the control message of rtrs.
930  * @req: an io request holding information about IO.
931  * @sess: client session
932  * @conf: conformation callback function to notify upper layer.
933  * @permit: permit for allocation of RDMA remote buffer
934  * @priv: private pointer
935  * @vec: kernel vector containing control message
936  * @usr_len: length of the user message
937  * @sg: scater list for IO data
938  * @sg_cnt: number of scater list entries
939  * @data_len: length of the IO data
940  * @dir: direction of the IO.
941  */
942 static void rtrs_clt_init_req(struct rtrs_clt_io_req *req,
943                               struct rtrs_clt_sess *sess,
944                               void (*conf)(void *priv, int errno),
945                               struct rtrs_permit *permit, void *priv,
946                               const struct kvec *vec, size_t usr_len,
947                               struct scatterlist *sg, size_t sg_cnt,
948                               size_t data_len, int dir)
949 {
950         struct iov_iter iter;
951         size_t len;
952
953         req->permit = permit;
954         req->in_use = true;
955         req->usr_len = usr_len;
956         req->data_len = data_len;
957         req->sglist = sg;
958         req->sg_cnt = sg_cnt;
959         req->priv = priv;
960         req->dir = dir;
961         req->con = rtrs_permit_to_clt_con(sess, permit);
962         req->conf = conf;
963         req->need_inv = false;
964         req->need_inv_comp = false;
965         req->inv_errno = 0;
966         refcount_set(&req->ref, 1);
967         req->mp_policy = sess->clt->mp_policy;
968
969         iov_iter_kvec(&iter, READ, vec, 1, usr_len);
970         len = _copy_from_iter(req->iu->buf, usr_len, &iter);
971         WARN_ON(len != usr_len);
972
973         reinit_completion(&req->inv_comp);
974 }
975
976 static struct rtrs_clt_io_req *
977 rtrs_clt_get_req(struct rtrs_clt_sess *sess,
978                  void (*conf)(void *priv, int errno),
979                  struct rtrs_permit *permit, void *priv,
980                  const struct kvec *vec, size_t usr_len,
981                  struct scatterlist *sg, size_t sg_cnt,
982                  size_t data_len, int dir)
983 {
984         struct rtrs_clt_io_req *req;
985
986         req = &sess->reqs[permit->mem_id];
987         rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len,
988                            sg, sg_cnt, data_len, dir);
989         return req;
990 }
991
992 static struct rtrs_clt_io_req *
993 rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess,
994                        struct rtrs_clt_io_req *fail_req)
995 {
996         struct rtrs_clt_io_req *req;
997         struct kvec vec = {
998                 .iov_base = fail_req->iu->buf,
999                 .iov_len  = fail_req->usr_len
1000         };
1001
1002         req = &alive_sess->reqs[fail_req->permit->mem_id];
1003         rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit,
1004                            fail_req->priv, &vec, fail_req->usr_len,
1005                            fail_req->sglist, fail_req->sg_cnt,
1006                            fail_req->data_len, fail_req->dir);
1007         return req;
1008 }
1009
1010 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con,
1011                                    struct rtrs_clt_io_req *req,
1012                                    struct rtrs_rbuf *rbuf, bool fr_en,
1013                                    u32 size, u32 imm, struct ib_send_wr *wr,
1014                                    struct ib_send_wr *tail)
1015 {
1016         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1017         struct ib_sge *sge = req->sge;
1018         enum ib_send_flags flags;
1019         struct scatterlist *sg;
1020         size_t num_sge;
1021         int i;
1022         struct ib_send_wr *ptail = NULL;
1023
1024         if (fr_en) {
1025                 i = 0;
1026                 sge[i].addr   = req->mr->iova;
1027                 sge[i].length = req->mr->length;
1028                 sge[i].lkey   = req->mr->lkey;
1029                 i++;
1030                 num_sge = 2;
1031                 ptail = tail;
1032         } else {
1033                 for_each_sg(req->sglist, sg, req->sg_cnt, i) {
1034                         sge[i].addr   = sg_dma_address(sg);
1035                         sge[i].length = sg_dma_len(sg);
1036                         sge[i].lkey   = sess->s.dev->ib_pd->local_dma_lkey;
1037                 }
1038                 num_sge = 1 + req->sg_cnt;
1039         }
1040         sge[i].addr   = req->iu->dma_addr;
1041         sge[i].length = size;
1042         sge[i].lkey   = sess->s.dev->ib_pd->local_dma_lkey;
1043
1044         /*
1045          * From time to time we have to post signalled sends,
1046          * or send queue will fill up and only QP reset can help.
1047          */
1048         flags = atomic_inc_return(&con->c.wr_cnt) % sess->s.signal_interval ?
1049                         0 : IB_SEND_SIGNALED;
1050
1051         ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
1052                                       size, DMA_TO_DEVICE);
1053
1054         return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge,
1055                                             rbuf->rkey, rbuf->addr, imm,
1056                                             flags, wr, ptail);
1057 }
1058
1059 static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count)
1060 {
1061         int nr;
1062
1063         /* Align the MR to a 4K page size to match the block virt boundary */
1064         nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K);
1065         if (nr < 0)
1066                 return nr;
1067         if (nr < req->sg_cnt)
1068                 return -EINVAL;
1069         ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey));
1070
1071         return nr;
1072 }
1073
1074 static int rtrs_clt_write_req(struct rtrs_clt_io_req *req)
1075 {
1076         struct rtrs_clt_con *con = req->con;
1077         struct rtrs_sess *s = con->c.sess;
1078         struct rtrs_clt_sess *sess = to_clt_sess(s);
1079         struct rtrs_msg_rdma_write *msg;
1080
1081         struct rtrs_rbuf *rbuf;
1082         int ret, count = 0;
1083         u32 imm, buf_id;
1084         struct ib_reg_wr rwr;
1085         struct ib_send_wr inv_wr;
1086         struct ib_send_wr *wr = NULL;
1087         bool fr_en = false;
1088
1089         const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1090
1091         if (tsize > sess->chunk_size) {
1092                 rtrs_wrn(s, "Write request failed, size too big %zu > %d\n",
1093                           tsize, sess->chunk_size);
1094                 return -EMSGSIZE;
1095         }
1096         if (req->sg_cnt) {
1097                 count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist,
1098                                       req->sg_cnt, req->dir);
1099                 if (!count) {
1100                         rtrs_wrn(s, "Write request failed, map failed\n");
1101                         return -EINVAL;
1102                 }
1103         }
1104         /* put rtrs msg after sg and user message */
1105         msg = req->iu->buf + req->usr_len;
1106         msg->type = cpu_to_le16(RTRS_MSG_WRITE);
1107         msg->usr_len = cpu_to_le16(req->usr_len);
1108
1109         /* rtrs message on server side will be after user data and message */
1110         imm = req->permit->mem_off + req->data_len + req->usr_len;
1111         imm = rtrs_to_io_req_imm(imm);
1112         buf_id = req->permit->mem_id;
1113         req->sg_size = tsize;
1114         rbuf = &sess->rbufs[buf_id];
1115
1116         if (count) {
1117                 ret = rtrs_map_sg_fr(req, count);
1118                 if (ret < 0) {
1119                         rtrs_err_rl(s,
1120                                     "Write request failed, failed to map fast reg. data, err: %d\n",
1121                                     ret);
1122                         ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1123                                         req->sg_cnt, req->dir);
1124                         return ret;
1125                 }
1126                 inv_wr = (struct ib_send_wr) {
1127                         .opcode             = IB_WR_LOCAL_INV,
1128                         .wr_cqe             = &req->inv_cqe,
1129                         .send_flags         = IB_SEND_SIGNALED,
1130                         .ex.invalidate_rkey = req->mr->rkey,
1131                 };
1132                 req->inv_cqe.done = rtrs_clt_inv_rkey_done;
1133                 rwr = (struct ib_reg_wr) {
1134                         .wr.opcode = IB_WR_REG_MR,
1135                         .wr.wr_cqe = &fast_reg_cqe,
1136                         .mr = req->mr,
1137                         .key = req->mr->rkey,
1138                         .access = (IB_ACCESS_LOCAL_WRITE),
1139                 };
1140                 wr = &rwr.wr;
1141                 fr_en = true;
1142                 refcount_inc(&req->ref);
1143         }
1144         /*
1145          * Update stats now, after request is successfully sent it is not
1146          * safe anymore to touch it.
1147          */
1148         rtrs_clt_update_all_stats(req, WRITE);
1149
1150         ret = rtrs_post_rdma_write_sg(req->con, req, rbuf, fr_en,
1151                                       req->usr_len + sizeof(*msg),
1152                                       imm, wr, &inv_wr);
1153         if (ret) {
1154                 rtrs_err_rl(s,
1155                             "Write request failed: error=%d path=%s [%s:%u]\n",
1156                             ret, kobject_name(&sess->kobj), sess->hca_name,
1157                             sess->hca_port);
1158                 if (req->mp_policy == MP_POLICY_MIN_INFLIGHT)
1159                         atomic_dec(&sess->stats->inflight);
1160                 if (req->sg_cnt)
1161                         ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1162                                         req->sg_cnt, req->dir);
1163         }
1164
1165         return ret;
1166 }
1167
1168 static int rtrs_clt_read_req(struct rtrs_clt_io_req *req)
1169 {
1170         struct rtrs_clt_con *con = req->con;
1171         struct rtrs_sess *s = con->c.sess;
1172         struct rtrs_clt_sess *sess = to_clt_sess(s);
1173         struct rtrs_msg_rdma_read *msg;
1174         struct rtrs_ib_dev *dev = sess->s.dev;
1175
1176         struct ib_reg_wr rwr;
1177         struct ib_send_wr *wr = NULL;
1178
1179         int ret, count = 0;
1180         u32 imm, buf_id;
1181
1182         const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1183
1184         if (tsize > sess->chunk_size) {
1185                 rtrs_wrn(s,
1186                           "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1187                           tsize, sess->chunk_size);
1188                 return -EMSGSIZE;
1189         }
1190
1191         if (req->sg_cnt) {
1192                 count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1193                                       req->dir);
1194                 if (!count) {
1195                         rtrs_wrn(s,
1196                                   "Read request failed, dma map failed\n");
1197                         return -EINVAL;
1198                 }
1199         }
1200         /* put our message into req->buf after user message*/
1201         msg = req->iu->buf + req->usr_len;
1202         msg->type = cpu_to_le16(RTRS_MSG_READ);
1203         msg->usr_len = cpu_to_le16(req->usr_len);
1204
1205         if (count) {
1206                 ret = rtrs_map_sg_fr(req, count);
1207                 if (ret < 0) {
1208                         rtrs_err_rl(s,
1209                                      "Read request failed, failed to map  fast reg. data, err: %d\n",
1210                                      ret);
1211                         ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1212                                         req->dir);
1213                         return ret;
1214                 }
1215                 rwr = (struct ib_reg_wr) {
1216                         .wr.opcode = IB_WR_REG_MR,
1217                         .wr.wr_cqe = &fast_reg_cqe,
1218                         .mr = req->mr,
1219                         .key = req->mr->rkey,
1220                         .access = (IB_ACCESS_LOCAL_WRITE |
1221                                    IB_ACCESS_REMOTE_WRITE),
1222                 };
1223                 wr = &rwr.wr;
1224
1225                 msg->sg_cnt = cpu_to_le16(1);
1226                 msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F);
1227
1228                 msg->desc[0].addr = cpu_to_le64(req->mr->iova);
1229                 msg->desc[0].key = cpu_to_le32(req->mr->rkey);
1230                 msg->desc[0].len = cpu_to_le32(req->mr->length);
1231
1232                 /* Further invalidation is required */
1233                 req->need_inv = !!RTRS_MSG_NEED_INVAL_F;
1234
1235         } else {
1236                 msg->sg_cnt = 0;
1237                 msg->flags = 0;
1238         }
1239         /*
1240          * rtrs message will be after the space reserved for disk data and
1241          * user message
1242          */
1243         imm = req->permit->mem_off + req->data_len + req->usr_len;
1244         imm = rtrs_to_io_req_imm(imm);
1245         buf_id = req->permit->mem_id;
1246
1247         req->sg_size  = sizeof(*msg);
1248         req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc);
1249         req->sg_size += req->usr_len;
1250
1251         /*
1252          * Update stats now, after request is successfully sent it is not
1253          * safe anymore to touch it.
1254          */
1255         rtrs_clt_update_all_stats(req, READ);
1256
1257         ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id],
1258                                    req->data_len, imm, wr);
1259         if (ret) {
1260                 rtrs_err_rl(s,
1261                             "Read request failed: error=%d path=%s [%s:%u]\n",
1262                             ret, kobject_name(&sess->kobj), sess->hca_name,
1263                             sess->hca_port);
1264                 if (req->mp_policy == MP_POLICY_MIN_INFLIGHT)
1265                         atomic_dec(&sess->stats->inflight);
1266                 req->need_inv = false;
1267                 if (req->sg_cnt)
1268                         ib_dma_unmap_sg(dev->ib_dev, req->sglist,
1269                                         req->sg_cnt, req->dir);
1270         }
1271
1272         return ret;
1273 }
1274
1275 /**
1276  * rtrs_clt_failover_req() - Try to find an active path for a failed request
1277  * @clt: clt context
1278  * @fail_req: a failed io request.
1279  */
1280 static int rtrs_clt_failover_req(struct rtrs_clt *clt,
1281                                  struct rtrs_clt_io_req *fail_req)
1282 {
1283         struct rtrs_clt_sess *alive_sess;
1284         struct rtrs_clt_io_req *req;
1285         int err = -ECONNABORTED;
1286         struct path_it it;
1287
1288         rcu_read_lock();
1289         for (path_it_init(&it, clt);
1290              (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num;
1291              it.i++) {
1292                 if (READ_ONCE(alive_sess->state) != RTRS_CLT_CONNECTED)
1293                         continue;
1294                 req = rtrs_clt_get_copy_req(alive_sess, fail_req);
1295                 if (req->dir == DMA_TO_DEVICE)
1296                         err = rtrs_clt_write_req(req);
1297                 else
1298                         err = rtrs_clt_read_req(req);
1299                 if (err) {
1300                         req->in_use = false;
1301                         continue;
1302                 }
1303                 /* Success path */
1304                 rtrs_clt_inc_failover_cnt(alive_sess->stats);
1305                 break;
1306         }
1307         path_it_deinit(&it);
1308         rcu_read_unlock();
1309
1310         return err;
1311 }
1312
1313 static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess)
1314 {
1315         struct rtrs_clt *clt = sess->clt;
1316         struct rtrs_clt_io_req *req;
1317         int i, err;
1318
1319         if (!sess->reqs)
1320                 return;
1321         for (i = 0; i < sess->queue_depth; ++i) {
1322                 req = &sess->reqs[i];
1323                 if (!req->in_use)
1324                         continue;
1325
1326                 /*
1327                  * Safely (without notification) complete failed request.
1328                  * After completion this request is still useble and can
1329                  * be failovered to another path.
1330                  */
1331                 complete_rdma_req(req, -ECONNABORTED, false, true);
1332
1333                 err = rtrs_clt_failover_req(clt, req);
1334                 if (err)
1335                         /* Failover failed, notify anyway */
1336                         req->conf(req->priv, err);
1337         }
1338 }
1339
1340 static void free_sess_reqs(struct rtrs_clt_sess *sess)
1341 {
1342         struct rtrs_clt_io_req *req;
1343         int i;
1344
1345         if (!sess->reqs)
1346                 return;
1347         for (i = 0; i < sess->queue_depth; ++i) {
1348                 req = &sess->reqs[i];
1349                 if (req->mr)
1350                         ib_dereg_mr(req->mr);
1351                 kfree(req->sge);
1352                 rtrs_iu_free(req->iu, sess->s.dev->ib_dev, 1);
1353         }
1354         kfree(sess->reqs);
1355         sess->reqs = NULL;
1356 }
1357
1358 static int alloc_sess_reqs(struct rtrs_clt_sess *sess)
1359 {
1360         struct rtrs_clt_io_req *req;
1361         int i, err = -ENOMEM;
1362
1363         sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs),
1364                              GFP_KERNEL);
1365         if (!sess->reqs)
1366                 return -ENOMEM;
1367
1368         for (i = 0; i < sess->queue_depth; ++i) {
1369                 req = &sess->reqs[i];
1370                 req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL,
1371                                          sess->s.dev->ib_dev,
1372                                          DMA_TO_DEVICE,
1373                                          rtrs_clt_rdma_done);
1374                 if (!req->iu)
1375                         goto out;
1376
1377                 req->sge = kcalloc(2, sizeof(*req->sge), GFP_KERNEL);
1378                 if (!req->sge)
1379                         goto out;
1380
1381                 req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
1382                                       sess->max_pages_per_mr);
1383                 if (IS_ERR(req->mr)) {
1384                         err = PTR_ERR(req->mr);
1385                         req->mr = NULL;
1386                         pr_err("Failed to alloc sess->max_pages_per_mr %d\n",
1387                                sess->max_pages_per_mr);
1388                         goto out;
1389                 }
1390
1391                 init_completion(&req->inv_comp);
1392         }
1393
1394         return 0;
1395
1396 out:
1397         free_sess_reqs(sess);
1398
1399         return err;
1400 }
1401
1402 static int alloc_permits(struct rtrs_clt *clt)
1403 {
1404         unsigned int chunk_bits;
1405         int err, i;
1406
1407         clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth),
1408                                    sizeof(long), GFP_KERNEL);
1409         if (!clt->permits_map) {
1410                 err = -ENOMEM;
1411                 goto out_err;
1412         }
1413         clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL);
1414         if (!clt->permits) {
1415                 err = -ENOMEM;
1416                 goto err_map;
1417         }
1418         chunk_bits = ilog2(clt->queue_depth - 1) + 1;
1419         for (i = 0; i < clt->queue_depth; i++) {
1420                 struct rtrs_permit *permit;
1421
1422                 permit = get_permit(clt, i);
1423                 permit->mem_id = i;
1424                 permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits);
1425         }
1426
1427         return 0;
1428
1429 err_map:
1430         kfree(clt->permits_map);
1431         clt->permits_map = NULL;
1432 out_err:
1433         return err;
1434 }
1435
1436 static void free_permits(struct rtrs_clt *clt)
1437 {
1438         if (clt->permits_map) {
1439                 size_t sz = clt->queue_depth;
1440
1441                 wait_event(clt->permits_wait,
1442                            find_first_bit(clt->permits_map, sz) >= sz);
1443         }
1444         kfree(clt->permits_map);
1445         clt->permits_map = NULL;
1446         kfree(clt->permits);
1447         clt->permits = NULL;
1448 }
1449
1450 static void query_fast_reg_mode(struct rtrs_clt_sess *sess)
1451 {
1452         struct ib_device *ib_dev;
1453         u64 max_pages_per_mr;
1454         int mr_page_shift;
1455
1456         ib_dev = sess->s.dev->ib_dev;
1457
1458         /*
1459          * Use the smallest page size supported by the HCA, down to a
1460          * minimum of 4096 bytes. We're unlikely to build large sglists
1461          * out of smaller entries.
1462          */
1463         mr_page_shift      = max(12, ffs(ib_dev->attrs.page_size_cap) - 1);
1464         max_pages_per_mr   = ib_dev->attrs.max_mr_size;
1465         do_div(max_pages_per_mr, (1ull << mr_page_shift));
1466         sess->max_pages_per_mr =
1467                 min3(sess->max_pages_per_mr, (u32)max_pages_per_mr,
1468                      ib_dev->attrs.max_fast_reg_page_list_len);
1469         sess->clt->max_segments =
1470                 min(sess->max_pages_per_mr, sess->clt->max_segments);
1471 }
1472
1473 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess,
1474                                            enum rtrs_clt_state new_state,
1475                                            enum rtrs_clt_state *old_state)
1476 {
1477         bool changed;
1478
1479         spin_lock_irq(&sess->state_wq.lock);
1480         if (old_state)
1481                 *old_state = sess->state;
1482         changed = rtrs_clt_change_state(sess, new_state);
1483         spin_unlock_irq(&sess->state_wq.lock);
1484
1485         return changed;
1486 }
1487
1488 static void rtrs_clt_hb_err_handler(struct rtrs_con *c)
1489 {
1490         struct rtrs_clt_con *con = container_of(c, typeof(*con), c);
1491
1492         rtrs_rdma_error_recovery(con);
1493 }
1494
1495 static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess)
1496 {
1497         rtrs_init_hb(&sess->s, &io_comp_cqe,
1498                       RTRS_HB_INTERVAL_MS,
1499                       RTRS_HB_MISSED_MAX,
1500                       rtrs_clt_hb_err_handler,
1501                       rtrs_wq);
1502 }
1503
1504 static void rtrs_clt_reconnect_work(struct work_struct *work);
1505 static void rtrs_clt_close_work(struct work_struct *work);
1506
1507 static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt,
1508                                         const struct rtrs_addr *path,
1509                                         size_t con_num, u32 nr_poll_queues)
1510 {
1511         struct rtrs_clt_sess *sess;
1512         int err = -ENOMEM;
1513         int cpu;
1514         size_t total_con;
1515
1516         sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1517         if (!sess)
1518                 goto err;
1519
1520         /*
1521          * irqmode and poll
1522          * +1: Extra connection for user messages
1523          */
1524         total_con = con_num + nr_poll_queues + 1;
1525         sess->s.con = kcalloc(total_con, sizeof(*sess->s.con), GFP_KERNEL);
1526         if (!sess->s.con)
1527                 goto err_free_sess;
1528
1529         sess->s.con_num = total_con;
1530         sess->s.irq_con_num = con_num + 1;
1531
1532         sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1533         if (!sess->stats)
1534                 goto err_free_con;
1535
1536         mutex_init(&sess->init_mutex);
1537         uuid_gen(&sess->s.uuid);
1538         memcpy(&sess->s.dst_addr, path->dst,
1539                rdma_addr_size((struct sockaddr *)path->dst));
1540
1541         /*
1542          * rdma_resolve_addr() passes src_addr to cma_bind_addr, which
1543          * checks the sa_family to be non-zero. If user passed src_addr=NULL
1544          * the sess->src_addr will contain only zeros, which is then fine.
1545          */
1546         if (path->src)
1547                 memcpy(&sess->s.src_addr, path->src,
1548                        rdma_addr_size((struct sockaddr *)path->src));
1549         strscpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname));
1550         sess->clt = clt;
1551         sess->max_pages_per_mr = RTRS_MAX_SEGMENTS;
1552         init_waitqueue_head(&sess->state_wq);
1553         sess->state = RTRS_CLT_CONNECTING;
1554         atomic_set(&sess->connected_cnt, 0);
1555         INIT_WORK(&sess->close_work, rtrs_clt_close_work);
1556         INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work);
1557         rtrs_clt_init_hb(sess);
1558
1559         sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry));
1560         if (!sess->mp_skip_entry)
1561                 goto err_free_stats;
1562
1563         for_each_possible_cpu(cpu)
1564                 INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu));
1565
1566         err = rtrs_clt_init_stats(sess->stats);
1567         if (err)
1568                 goto err_free_percpu;
1569
1570         return sess;
1571
1572 err_free_percpu:
1573         free_percpu(sess->mp_skip_entry);
1574 err_free_stats:
1575         kfree(sess->stats);
1576 err_free_con:
1577         kfree(sess->s.con);
1578 err_free_sess:
1579         kfree(sess);
1580 err:
1581         return ERR_PTR(err);
1582 }
1583
1584 void free_sess(struct rtrs_clt_sess *sess)
1585 {
1586         free_percpu(sess->mp_skip_entry);
1587         mutex_destroy(&sess->init_mutex);
1588         kfree(sess->s.con);
1589         kfree(sess->rbufs);
1590         kfree(sess);
1591 }
1592
1593 static int create_con(struct rtrs_clt_sess *sess, unsigned int cid)
1594 {
1595         struct rtrs_clt_con *con;
1596
1597         con = kzalloc(sizeof(*con), GFP_KERNEL);
1598         if (!con)
1599                 return -ENOMEM;
1600
1601         /* Map first two connections to the first CPU */
1602         con->cpu  = (cid ? cid - 1 : 0) % nr_cpu_ids;
1603         con->c.cid = cid;
1604         con->c.sess = &sess->s;
1605         /* Align with srv, init as 1 */
1606         atomic_set(&con->c.wr_cnt, 1);
1607         mutex_init(&con->con_mutex);
1608
1609         sess->s.con[cid] = &con->c;
1610
1611         return 0;
1612 }
1613
1614 static void destroy_con(struct rtrs_clt_con *con)
1615 {
1616         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1617
1618         sess->s.con[con->c.cid] = NULL;
1619         mutex_destroy(&con->con_mutex);
1620         kfree(con);
1621 }
1622
1623 static int create_con_cq_qp(struct rtrs_clt_con *con)
1624 {
1625         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1626         u32 max_send_wr, max_recv_wr, cq_num, max_send_sge, wr_limit;
1627         int err, cq_vector;
1628         struct rtrs_msg_rkey_rsp *rsp;
1629
1630         lockdep_assert_held(&con->con_mutex);
1631         if (con->c.cid == 0) {
1632                 max_send_sge = 1;
1633                 /* We must be the first here */
1634                 if (WARN_ON(sess->s.dev))
1635                         return -EINVAL;
1636
1637                 /*
1638                  * The whole session uses device from user connection.
1639                  * Be careful not to close user connection before ib dev
1640                  * is gracefully put.
1641                  */
1642                 sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device,
1643                                                        &dev_pd);
1644                 if (!sess->s.dev) {
1645                         rtrs_wrn(sess->clt,
1646                                   "rtrs_ib_dev_find_get_or_add(): no memory\n");
1647                         return -ENOMEM;
1648                 }
1649                 sess->s.dev_ref = 1;
1650                 query_fast_reg_mode(sess);
1651                 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1652                 /*
1653                  * Two (request + registration) completion for send
1654                  * Two for recv if always_invalidate is set on server
1655                  * or one for recv.
1656                  * + 2 for drain and heartbeat
1657                  * in case qp gets into error state.
1658                  */
1659                 max_send_wr =
1660                         min_t(int, wr_limit, SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1661                 max_recv_wr = max_send_wr;
1662         } else {
1663                 /*
1664                  * Here we assume that session members are correctly set.
1665                  * This is always true if user connection (cid == 0) is
1666                  * established first.
1667                  */
1668                 if (WARN_ON(!sess->s.dev))
1669                         return -EINVAL;
1670                 if (WARN_ON(!sess->queue_depth))
1671                         return -EINVAL;
1672
1673                 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1674                 /* Shared between connections */
1675                 sess->s.dev_ref++;
1676                 max_send_wr = min_t(int, wr_limit,
1677                               /* QD * (REQ + RSP + FR REGS or INVS) + drain */
1678                               sess->queue_depth * 3 + 1);
1679                 max_recv_wr = min_t(int, wr_limit,
1680                               sess->queue_depth * 3 + 1);
1681                 max_send_sge = 2;
1682         }
1683         atomic_set(&con->c.sq_wr_avail, max_send_wr);
1684         cq_num = max_send_wr + max_recv_wr;
1685         /* alloc iu to recv new rkey reply when server reports flags set */
1686         if (sess->flags & RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) {
1687                 con->rsp_ius = rtrs_iu_alloc(cq_num, sizeof(*rsp),
1688                                               GFP_KERNEL, sess->s.dev->ib_dev,
1689                                               DMA_FROM_DEVICE,
1690                                               rtrs_clt_rdma_done);
1691                 if (!con->rsp_ius)
1692                         return -ENOMEM;
1693                 con->queue_num = cq_num;
1694         }
1695         cq_num = max_send_wr + max_recv_wr;
1696         cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors;
1697         if (con->c.cid >= sess->s.irq_con_num)
1698                 err = rtrs_cq_qp_create(&sess->s, &con->c, max_send_sge,
1699                                         cq_vector, cq_num, max_send_wr,
1700                                         max_recv_wr, IB_POLL_DIRECT);
1701         else
1702                 err = rtrs_cq_qp_create(&sess->s, &con->c, max_send_sge,
1703                                         cq_vector, cq_num, max_send_wr,
1704                                         max_recv_wr, IB_POLL_SOFTIRQ);
1705         /*
1706          * In case of error we do not bother to clean previous allocations,
1707          * since destroy_con_cq_qp() must be called.
1708          */
1709         return err;
1710 }
1711
1712 static void destroy_con_cq_qp(struct rtrs_clt_con *con)
1713 {
1714         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1715
1716         /*
1717          * Be careful here: destroy_con_cq_qp() can be called even
1718          * create_con_cq_qp() failed, see comments there.
1719          */
1720         lockdep_assert_held(&con->con_mutex);
1721         rtrs_cq_qp_destroy(&con->c);
1722         if (con->rsp_ius) {
1723                 rtrs_iu_free(con->rsp_ius, sess->s.dev->ib_dev, con->queue_num);
1724                 con->rsp_ius = NULL;
1725                 con->queue_num = 0;
1726         }
1727         if (sess->s.dev_ref && !--sess->s.dev_ref) {
1728                 rtrs_ib_dev_put(sess->s.dev);
1729                 sess->s.dev = NULL;
1730         }
1731 }
1732
1733 static void stop_cm(struct rtrs_clt_con *con)
1734 {
1735         rdma_disconnect(con->c.cm_id);
1736         if (con->c.qp)
1737                 ib_drain_qp(con->c.qp);
1738 }
1739
1740 static void destroy_cm(struct rtrs_clt_con *con)
1741 {
1742         rdma_destroy_id(con->c.cm_id);
1743         con->c.cm_id = NULL;
1744 }
1745
1746 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con)
1747 {
1748         struct rtrs_sess *s = con->c.sess;
1749         int err;
1750
1751         mutex_lock(&con->con_mutex);
1752         err = create_con_cq_qp(con);
1753         mutex_unlock(&con->con_mutex);
1754         if (err) {
1755                 rtrs_err(s, "create_con_cq_qp(), err: %d\n", err);
1756                 return err;
1757         }
1758         err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS);
1759         if (err)
1760                 rtrs_err(s, "Resolving route failed, err: %d\n", err);
1761
1762         return err;
1763 }
1764
1765 static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con)
1766 {
1767         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1768         struct rtrs_clt *clt = sess->clt;
1769         struct rtrs_msg_conn_req msg;
1770         struct rdma_conn_param param;
1771
1772         int err;
1773
1774         param = (struct rdma_conn_param) {
1775                 .retry_count = 7,
1776                 .rnr_retry_count = 7,
1777                 .private_data = &msg,
1778                 .private_data_len = sizeof(msg),
1779         };
1780
1781         msg = (struct rtrs_msg_conn_req) {
1782                 .magic = cpu_to_le16(RTRS_MAGIC),
1783                 .version = cpu_to_le16(RTRS_PROTO_VER),
1784                 .cid = cpu_to_le16(con->c.cid),
1785                 .cid_num = cpu_to_le16(sess->s.con_num),
1786                 .recon_cnt = cpu_to_le16(sess->s.recon_cnt),
1787         };
1788         msg.first_conn = sess->for_new_clt ? FIRST_CONN : 0;
1789         uuid_copy(&msg.sess_uuid, &sess->s.uuid);
1790         uuid_copy(&msg.paths_uuid, &clt->paths_uuid);
1791
1792         err = rdma_connect_locked(con->c.cm_id, &param);
1793         if (err)
1794                 rtrs_err(clt, "rdma_connect_locked(): %d\n", err);
1795
1796         return err;
1797 }
1798
1799 static int rtrs_rdma_conn_established(struct rtrs_clt_con *con,
1800                                        struct rdma_cm_event *ev)
1801 {
1802         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1803         struct rtrs_clt *clt = sess->clt;
1804         const struct rtrs_msg_conn_rsp *msg;
1805         u16 version, queue_depth;
1806         int errno;
1807         u8 len;
1808
1809         msg = ev->param.conn.private_data;
1810         len = ev->param.conn.private_data_len;
1811         if (len < sizeof(*msg)) {
1812                 rtrs_err(clt, "Invalid RTRS connection response\n");
1813                 return -ECONNRESET;
1814         }
1815         if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1816                 rtrs_err(clt, "Invalid RTRS magic\n");
1817                 return -ECONNRESET;
1818         }
1819         version = le16_to_cpu(msg->version);
1820         if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1821                 rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n",
1822                           version >> 8, RTRS_PROTO_VER_MAJOR);
1823                 return -ECONNRESET;
1824         }
1825         errno = le16_to_cpu(msg->errno);
1826         if (errno) {
1827                 rtrs_err(clt, "Invalid RTRS message: errno %d\n",
1828                           errno);
1829                 return -ECONNRESET;
1830         }
1831         if (con->c.cid == 0) {
1832                 queue_depth = le16_to_cpu(msg->queue_depth);
1833
1834                 if (sess->queue_depth > 0 && queue_depth != sess->queue_depth) {
1835                         rtrs_err(clt, "Error: queue depth changed\n");
1836
1837                         /*
1838                          * Stop any more reconnection attempts
1839                          */
1840                         sess->reconnect_attempts = -1;
1841                         rtrs_err(clt,
1842                                 "Disabling auto-reconnect. Trigger a manual reconnect after issue is resolved\n");
1843                         return -ECONNRESET;
1844                 }
1845
1846                 if (!sess->rbufs) {
1847                         sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs),
1848                                               GFP_KERNEL);
1849                         if (!sess->rbufs)
1850                                 return -ENOMEM;
1851                 }
1852                 sess->queue_depth = queue_depth;
1853                 sess->s.signal_interval = min_not_zero(queue_depth,
1854                                                 (unsigned short) SERVICE_CON_QUEUE_DEPTH);
1855                 sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size);
1856                 sess->max_io_size = le32_to_cpu(msg->max_io_size);
1857                 sess->flags = le32_to_cpu(msg->flags);
1858                 sess->chunk_size = sess->max_io_size + sess->max_hdr_size;
1859
1860                 /*
1861                  * Global IO size is always a minimum.
1862                  * If while a reconnection server sends us a value a bit
1863                  * higher - client does not care and uses cached minimum.
1864                  *
1865                  * Since we can have several sessions (paths) restablishing
1866                  * connections in parallel, use lock.
1867                  */
1868                 mutex_lock(&clt->paths_mutex);
1869                 clt->queue_depth = sess->queue_depth;
1870                 clt->max_io_size = min_not_zero(sess->max_io_size,
1871                                                 clt->max_io_size);
1872                 mutex_unlock(&clt->paths_mutex);
1873
1874                 /*
1875                  * Cache the hca_port and hca_name for sysfs
1876                  */
1877                 sess->hca_port = con->c.cm_id->port_num;
1878                 scnprintf(sess->hca_name, sizeof(sess->hca_name),
1879                           sess->s.dev->ib_dev->name);
1880                 sess->s.src_addr = con->c.cm_id->route.addr.src_addr;
1881                 /* set for_new_clt, to allow future reconnect on any path */
1882                 sess->for_new_clt = 1;
1883         }
1884
1885         return 0;
1886 }
1887
1888 static inline void flag_success_on_conn(struct rtrs_clt_con *con)
1889 {
1890         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1891
1892         atomic_inc(&sess->connected_cnt);
1893         con->cm_err = 1;
1894 }
1895
1896 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con,
1897                                     struct rdma_cm_event *ev)
1898 {
1899         struct rtrs_sess *s = con->c.sess;
1900         const struct rtrs_msg_conn_rsp *msg;
1901         const char *rej_msg;
1902         int status, errno;
1903         u8 data_len;
1904
1905         status = ev->status;
1906         rej_msg = rdma_reject_msg(con->c.cm_id, status);
1907         msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len);
1908
1909         if (msg && data_len >= sizeof(*msg)) {
1910                 errno = (int16_t)le16_to_cpu(msg->errno);
1911                 if (errno == -EBUSY)
1912                         rtrs_err(s,
1913                                   "Previous session is still exists on the server, please reconnect later\n");
1914                 else
1915                         rtrs_err(s,
1916                                   "Connect rejected: status %d (%s), rtrs errno %d\n",
1917                                   status, rej_msg, errno);
1918         } else {
1919                 rtrs_err(s,
1920                           "Connect rejected but with malformed message: status %d (%s)\n",
1921                           status, rej_msg);
1922         }
1923
1924         return -ECONNRESET;
1925 }
1926
1927 void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait)
1928 {
1929         if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_CLOSING, NULL))
1930                 queue_work(rtrs_wq, &sess->close_work);
1931         if (wait)
1932                 flush_work(&sess->close_work);
1933 }
1934
1935 static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err)
1936 {
1937         if (con->cm_err == 1) {
1938                 struct rtrs_clt_sess *sess;
1939
1940                 sess = to_clt_sess(con->c.sess);
1941                 if (atomic_dec_and_test(&sess->connected_cnt))
1942
1943                         wake_up(&sess->state_wq);
1944         }
1945         con->cm_err = cm_err;
1946 }
1947
1948 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id,
1949                                      struct rdma_cm_event *ev)
1950 {
1951         struct rtrs_clt_con *con = cm_id->context;
1952         struct rtrs_sess *s = con->c.sess;
1953         struct rtrs_clt_sess *sess = to_clt_sess(s);
1954         int cm_err = 0;
1955
1956         switch (ev->event) {
1957         case RDMA_CM_EVENT_ADDR_RESOLVED:
1958                 cm_err = rtrs_rdma_addr_resolved(con);
1959                 break;
1960         case RDMA_CM_EVENT_ROUTE_RESOLVED:
1961                 cm_err = rtrs_rdma_route_resolved(con);
1962                 break;
1963         case RDMA_CM_EVENT_ESTABLISHED:
1964                 cm_err = rtrs_rdma_conn_established(con, ev);
1965                 if (!cm_err) {
1966                         /*
1967                          * Report success and wake up. Here we abuse state_wq,
1968                          * i.e. wake up without state change, but we set cm_err.
1969                          */
1970                         flag_success_on_conn(con);
1971                         wake_up(&sess->state_wq);
1972                         return 0;
1973                 }
1974                 break;
1975         case RDMA_CM_EVENT_REJECTED:
1976                 cm_err = rtrs_rdma_conn_rejected(con, ev);
1977                 break;
1978         case RDMA_CM_EVENT_DISCONNECTED:
1979                 /* No message for disconnecting */
1980                 cm_err = -ECONNRESET;
1981                 break;
1982         case RDMA_CM_EVENT_CONNECT_ERROR:
1983         case RDMA_CM_EVENT_UNREACHABLE:
1984         case RDMA_CM_EVENT_ADDR_CHANGE:
1985         case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1986                 rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
1987                          rdma_event_msg(ev->event), ev->status);
1988                 cm_err = -ECONNRESET;
1989                 break;
1990         case RDMA_CM_EVENT_ADDR_ERROR:
1991         case RDMA_CM_EVENT_ROUTE_ERROR:
1992                 rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
1993                          rdma_event_msg(ev->event), ev->status);
1994                 cm_err = -EHOSTUNREACH;
1995                 break;
1996         case RDMA_CM_EVENT_DEVICE_REMOVAL:
1997                 /*
1998                  * Device removal is a special case.  Queue close and return 0.
1999                  */
2000                 rtrs_clt_close_conns(sess, false);
2001                 return 0;
2002         default:
2003                 rtrs_err(s, "Unexpected RDMA CM error (CM event: %s, err: %d)\n",
2004                          rdma_event_msg(ev->event), ev->status);
2005                 cm_err = -ECONNRESET;
2006                 break;
2007         }
2008
2009         if (cm_err) {
2010                 /*
2011                  * cm error makes sense only on connection establishing,
2012                  * in other cases we rely on normal procedure of reconnecting.
2013                  */
2014                 flag_error_on_conn(con, cm_err);
2015                 rtrs_rdma_error_recovery(con);
2016         }
2017
2018         return 0;
2019 }
2020
2021 static int create_cm(struct rtrs_clt_con *con)
2022 {
2023         struct rtrs_sess *s = con->c.sess;
2024         struct rtrs_clt_sess *sess = to_clt_sess(s);
2025         struct rdma_cm_id *cm_id;
2026         int err;
2027
2028         cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con,
2029                                sess->s.dst_addr.ss_family == AF_IB ?
2030                                RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC);
2031         if (IS_ERR(cm_id)) {
2032                 err = PTR_ERR(cm_id);
2033                 rtrs_err(s, "Failed to create CM ID, err: %d\n", err);
2034
2035                 return err;
2036         }
2037         con->c.cm_id = cm_id;
2038         con->cm_err = 0;
2039         /* allow the port to be reused */
2040         err = rdma_set_reuseaddr(cm_id, 1);
2041         if (err != 0) {
2042                 rtrs_err(s, "Set address reuse failed, err: %d\n", err);
2043                 goto destroy_cm;
2044         }
2045         err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr,
2046                                 (struct sockaddr *)&sess->s.dst_addr,
2047                                 RTRS_CONNECT_TIMEOUT_MS);
2048         if (err) {
2049                 rtrs_err(s, "Failed to resolve address, err: %d\n", err);
2050                 goto destroy_cm;
2051         }
2052         /*
2053          * Combine connection status and session events. This is needed
2054          * for waiting two possible cases: cm_err has something meaningful
2055          * or session state was really changed to error by device removal.
2056          */
2057         err = wait_event_interruptible_timeout(
2058                         sess->state_wq,
2059                         con->cm_err || sess->state != RTRS_CLT_CONNECTING,
2060                         msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2061         if (err == 0 || err == -ERESTARTSYS) {
2062                 if (err == 0)
2063                         err = -ETIMEDOUT;
2064                 /* Timedout or interrupted */
2065                 goto errr;
2066         }
2067         if (con->cm_err < 0) {
2068                 err = con->cm_err;
2069                 goto errr;
2070         }
2071         if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) {
2072                 /* Device removal */
2073                 err = -ECONNABORTED;
2074                 goto errr;
2075         }
2076
2077         return 0;
2078
2079 errr:
2080         stop_cm(con);
2081         mutex_lock(&con->con_mutex);
2082         destroy_con_cq_qp(con);
2083         mutex_unlock(&con->con_mutex);
2084 destroy_cm:
2085         destroy_cm(con);
2086
2087         return err;
2088 }
2089
2090 static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess)
2091 {
2092         struct rtrs_clt *clt = sess->clt;
2093         int up;
2094
2095         /*
2096          * We can fire RECONNECTED event only when all paths were
2097          * connected on rtrs_clt_open(), then each was disconnected
2098          * and the first one connected again.  That's why this nasty
2099          * game with counter value.
2100          */
2101
2102         mutex_lock(&clt->paths_ev_mutex);
2103         up = ++clt->paths_up;
2104         /*
2105          * Here it is safe to access paths num directly since up counter
2106          * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is
2107          * in progress, thus paths removals are impossible.
2108          */
2109         if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num)
2110                 clt->paths_up = clt->paths_num;
2111         else if (up == 1)
2112                 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED);
2113         mutex_unlock(&clt->paths_ev_mutex);
2114
2115         /* Mark session as established */
2116         sess->established = true;
2117         sess->reconnect_attempts = 0;
2118         sess->stats->reconnects.successful_cnt++;
2119 }
2120
2121 static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess)
2122 {
2123         struct rtrs_clt *clt = sess->clt;
2124
2125         if (!sess->established)
2126                 return;
2127
2128         sess->established = false;
2129         mutex_lock(&clt->paths_ev_mutex);
2130         WARN_ON(!clt->paths_up);
2131         if (--clt->paths_up == 0)
2132                 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED);
2133         mutex_unlock(&clt->paths_ev_mutex);
2134 }
2135
2136 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess)
2137 {
2138         struct rtrs_clt_con *con;
2139         unsigned int cid;
2140
2141         WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED);
2142
2143         /*
2144          * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes
2145          * exactly in between.  Start destroying after it finishes.
2146          */
2147         mutex_lock(&sess->init_mutex);
2148         mutex_unlock(&sess->init_mutex);
2149
2150         /*
2151          * All IO paths must observe !CONNECTED state before we
2152          * free everything.
2153          */
2154         synchronize_rcu();
2155
2156         rtrs_stop_hb(&sess->s);
2157
2158         /*
2159          * The order it utterly crucial: firstly disconnect and complete all
2160          * rdma requests with error (thus set in_use=false for requests),
2161          * then fail outstanding requests checking in_use for each, and
2162          * eventually notify upper layer about session disconnection.
2163          */
2164
2165         for (cid = 0; cid < sess->s.con_num; cid++) {
2166                 if (!sess->s.con[cid])
2167                         break;
2168                 con = to_clt_con(sess->s.con[cid]);
2169                 stop_cm(con);
2170         }
2171         fail_all_outstanding_reqs(sess);
2172         free_sess_reqs(sess);
2173         rtrs_clt_sess_down(sess);
2174
2175         /*
2176          * Wait for graceful shutdown, namely when peer side invokes
2177          * rdma_disconnect(). 'connected_cnt' is decremented only on
2178          * CM events, thus if other side had crashed and hb has detected
2179          * something is wrong, here we will stuck for exactly timeout ms,
2180          * since CM does not fire anything.  That is fine, we are not in
2181          * hurry.
2182          */
2183         wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt),
2184                            msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2185
2186         for (cid = 0; cid < sess->s.con_num; cid++) {
2187                 if (!sess->s.con[cid])
2188                         break;
2189                 con = to_clt_con(sess->s.con[cid]);
2190                 mutex_lock(&con->con_mutex);
2191                 destroy_con_cq_qp(con);
2192                 mutex_unlock(&con->con_mutex);
2193                 destroy_cm(con);
2194                 destroy_con(con);
2195         }
2196 }
2197
2198 static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path,
2199                                  struct rtrs_clt_sess *sess,
2200                                  struct rtrs_clt_sess *next)
2201 {
2202         struct rtrs_clt_sess **ppcpu_path;
2203
2204         /* Call cmpxchg() without sparse warnings */
2205         ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path;
2206         return sess == cmpxchg(ppcpu_path, sess, next);
2207 }
2208
2209 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess)
2210 {
2211         struct rtrs_clt *clt = sess->clt;
2212         struct rtrs_clt_sess *next;
2213         bool wait_for_grace = false;
2214         int cpu;
2215
2216         mutex_lock(&clt->paths_mutex);
2217         list_del_rcu(&sess->s.entry);
2218
2219         /* Make sure everybody observes path removal. */
2220         synchronize_rcu();
2221
2222         /*
2223          * At this point nobody sees @sess in the list, but still we have
2224          * dangling pointer @pcpu_path which _can_ point to @sess.  Since
2225          * nobody can observe @sess in the list, we guarantee that IO path
2226          * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal
2227          * to @sess, but can never again become @sess.
2228          */
2229
2230         /*
2231          * Decrement paths number only after grace period, because
2232          * caller of do_each_path() must firstly observe list without
2233          * path and only then decremented paths number.
2234          *
2235          * Otherwise there can be the following situation:
2236          *    o Two paths exist and IO is coming.
2237          *    o One path is removed:
2238          *      CPU#0                          CPU#1
2239          *      do_each_path():                rtrs_clt_remove_path_from_arr():
2240          *          path = get_next_path()
2241          *          ^^^                            list_del_rcu(path)
2242          *          [!CONNECTED path]              clt->paths_num--
2243          *                                              ^^^^^^^^^
2244          *          load clt->paths_num                 from 2 to 1
2245          *                    ^^^^^^^^^
2246          *                    sees 1
2247          *
2248          *      path is observed as !CONNECTED, but do_each_path() loop
2249          *      ends, because expression i < clt->paths_num is false.
2250          */
2251         clt->paths_num--;
2252
2253         /*
2254          * Get @next connection from current @sess which is going to be
2255          * removed.  If @sess is the last element, then @next is NULL.
2256          */
2257         rcu_read_lock();
2258         next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry,
2259                                         typeof(*next), s.entry);
2260         rcu_read_unlock();
2261
2262         /*
2263          * @pcpu paths can still point to the path which is going to be
2264          * removed, so change the pointer manually.
2265          */
2266         for_each_possible_cpu(cpu) {
2267                 struct rtrs_clt_sess __rcu **ppcpu_path;
2268
2269                 ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu);
2270                 if (rcu_dereference_protected(*ppcpu_path,
2271                         lockdep_is_held(&clt->paths_mutex)) != sess)
2272                         /*
2273                          * synchronize_rcu() was called just after deleting
2274                          * entry from the list, thus IO code path cannot
2275                          * change pointer back to the pointer which is going
2276                          * to be removed, we are safe here.
2277                          */
2278                         continue;
2279
2280                 /*
2281                  * We race with IO code path, which also changes pointer,
2282                  * thus we have to be careful not to overwrite it.
2283                  */
2284                 if (xchg_sessions(ppcpu_path, sess, next))
2285                         /*
2286                          * @ppcpu_path was successfully replaced with @next,
2287                          * that means that someone could also pick up the
2288                          * @sess and dereferencing it right now, so wait for
2289                          * a grace period is required.
2290                          */
2291                         wait_for_grace = true;
2292         }
2293         if (wait_for_grace)
2294                 synchronize_rcu();
2295
2296         mutex_unlock(&clt->paths_mutex);
2297 }
2298
2299 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess)
2300 {
2301         struct rtrs_clt *clt = sess->clt;
2302
2303         mutex_lock(&clt->paths_mutex);
2304         clt->paths_num++;
2305
2306         list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2307         mutex_unlock(&clt->paths_mutex);
2308 }
2309
2310 static void rtrs_clt_close_work(struct work_struct *work)
2311 {
2312         struct rtrs_clt_sess *sess;
2313
2314         sess = container_of(work, struct rtrs_clt_sess, close_work);
2315
2316         cancel_delayed_work_sync(&sess->reconnect_dwork);
2317         rtrs_clt_stop_and_destroy_conns(sess);
2318         rtrs_clt_change_state_get_old(sess, RTRS_CLT_CLOSED, NULL);
2319 }
2320
2321 static int init_conns(struct rtrs_clt_sess *sess)
2322 {
2323         unsigned int cid;
2324         int err;
2325
2326         /*
2327          * On every new session connections increase reconnect counter
2328          * to avoid clashes with previous sessions not yet closed
2329          * sessions on a server side.
2330          */
2331         sess->s.recon_cnt++;
2332
2333         /* Establish all RDMA connections  */
2334         for (cid = 0; cid < sess->s.con_num; cid++) {
2335                 err = create_con(sess, cid);
2336                 if (err)
2337                         goto destroy;
2338
2339                 err = create_cm(to_clt_con(sess->s.con[cid]));
2340                 if (err) {
2341                         destroy_con(to_clt_con(sess->s.con[cid]));
2342                         goto destroy;
2343                 }
2344         }
2345         err = alloc_sess_reqs(sess);
2346         if (err)
2347                 goto destroy;
2348
2349         rtrs_start_hb(&sess->s);
2350
2351         return 0;
2352
2353 destroy:
2354         while (cid--) {
2355                 struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]);
2356
2357                 stop_cm(con);
2358
2359                 mutex_lock(&con->con_mutex);
2360                 destroy_con_cq_qp(con);
2361                 mutex_unlock(&con->con_mutex);
2362                 destroy_cm(con);
2363                 destroy_con(con);
2364         }
2365         /*
2366          * If we've never taken async path and got an error, say,
2367          * doing rdma_resolve_addr(), switch to CONNECTION_ERR state
2368          * manually to keep reconnecting.
2369          */
2370         rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2371
2372         return err;
2373 }
2374
2375 static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
2376 {
2377         struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2378         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2379         struct rtrs_iu *iu;
2380
2381         iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2382         rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2383
2384         if (wc->status != IB_WC_SUCCESS) {
2385                 rtrs_err(sess->clt, "Sess info request send failed: %s\n",
2386                           ib_wc_status_msg(wc->status));
2387                 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2388                 return;
2389         }
2390
2391         rtrs_clt_update_wc_stats(con);
2392 }
2393
2394 static int process_info_rsp(struct rtrs_clt_sess *sess,
2395                             const struct rtrs_msg_info_rsp *msg)
2396 {
2397         unsigned int sg_cnt, total_len;
2398         int i, sgi;
2399
2400         sg_cnt = le16_to_cpu(msg->sg_cnt);
2401         if (!sg_cnt || (sess->queue_depth % sg_cnt)) {
2402                 rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n",
2403                           sg_cnt);
2404                 return -EINVAL;
2405         }
2406
2407         /*
2408          * Check if IB immediate data size is enough to hold the mem_id and
2409          * the offset inside the memory chunk.
2410          */
2411         if ((ilog2(sg_cnt - 1) + 1) + (ilog2(sess->chunk_size - 1) + 1) >
2412             MAX_IMM_PAYL_BITS) {
2413                 rtrs_err(sess->clt,
2414                           "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2415                           MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size);
2416                 return -EINVAL;
2417         }
2418         total_len = 0;
2419         for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) {
2420                 const struct rtrs_sg_desc *desc = &msg->desc[sgi];
2421                 u32 len, rkey;
2422                 u64 addr;
2423
2424                 addr = le64_to_cpu(desc->addr);
2425                 rkey = le32_to_cpu(desc->key);
2426                 len  = le32_to_cpu(desc->len);
2427
2428                 total_len += len;
2429
2430                 if (!len || (len % sess->chunk_size)) {
2431                         rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi,
2432                                   len);
2433                         return -EINVAL;
2434                 }
2435                 for ( ; len && i < sess->queue_depth; i++) {
2436                         sess->rbufs[i].addr = addr;
2437                         sess->rbufs[i].rkey = rkey;
2438
2439                         len  -= sess->chunk_size;
2440                         addr += sess->chunk_size;
2441                 }
2442         }
2443         /* Sanity check */
2444         if (sgi != sg_cnt || i != sess->queue_depth) {
2445                 rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n");
2446                 return -EINVAL;
2447         }
2448         if (total_len != sess->chunk_size * sess->queue_depth) {
2449                 rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len);
2450                 return -EINVAL;
2451         }
2452
2453         return 0;
2454 }
2455
2456 static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
2457 {
2458         struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2459         struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2460         struct rtrs_msg_info_rsp *msg;
2461         enum rtrs_clt_state state;
2462         struct rtrs_iu *iu;
2463         size_t rx_sz;
2464         int err;
2465
2466         state = RTRS_CLT_CONNECTING_ERR;
2467
2468         WARN_ON(con->c.cid);
2469         iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2470         if (wc->status != IB_WC_SUCCESS) {
2471                 rtrs_err(sess->clt, "Sess info response recv failed: %s\n",
2472                           ib_wc_status_msg(wc->status));
2473                 goto out;
2474         }
2475         WARN_ON(wc->opcode != IB_WC_RECV);
2476
2477         if (wc->byte_len < sizeof(*msg)) {
2478                 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2479                           wc->byte_len);
2480                 goto out;
2481         }
2482         ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
2483                                    iu->size, DMA_FROM_DEVICE);
2484         msg = iu->buf;
2485         if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP) {
2486                 rtrs_err(sess->clt, "Sess info response is malformed: type %d\n",
2487                           le16_to_cpu(msg->type));
2488                 goto out;
2489         }
2490         rx_sz  = sizeof(*msg);
2491         rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt);
2492         if (wc->byte_len < rx_sz) {
2493                 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2494                           wc->byte_len);
2495                 goto out;
2496         }
2497         err = process_info_rsp(sess, msg);
2498         if (err)
2499                 goto out;
2500
2501         err = post_recv_sess(sess);
2502         if (err)
2503                 goto out;
2504
2505         state = RTRS_CLT_CONNECTED;
2506
2507 out:
2508         rtrs_clt_update_wc_stats(con);
2509         rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2510         rtrs_clt_change_state_get_old(sess, state, NULL);
2511 }
2512
2513 static int rtrs_send_sess_info(struct rtrs_clt_sess *sess)
2514 {
2515         struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]);
2516         struct rtrs_msg_info_req *msg;
2517         struct rtrs_iu *tx_iu, *rx_iu;
2518         size_t rx_sz;
2519         int err;
2520
2521         rx_sz  = sizeof(struct rtrs_msg_info_rsp);
2522         rx_sz += sizeof(struct rtrs_sg_desc) * sess->queue_depth;
2523
2524         tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL,
2525                                sess->s.dev->ib_dev, DMA_TO_DEVICE,
2526                                rtrs_clt_info_req_done);
2527         rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
2528                                DMA_FROM_DEVICE, rtrs_clt_info_rsp_done);
2529         if (!tx_iu || !rx_iu) {
2530                 err = -ENOMEM;
2531                 goto out;
2532         }
2533         /* Prepare for getting info response */
2534         err = rtrs_iu_post_recv(&usr_con->c, rx_iu);
2535         if (err) {
2536                 rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err);
2537                 goto out;
2538         }
2539         rx_iu = NULL;
2540
2541         msg = tx_iu->buf;
2542         msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ);
2543         memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname));
2544
2545         ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
2546                                       tx_iu->size, DMA_TO_DEVICE);
2547
2548         /* Send info request */
2549         err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL);
2550         if (err) {
2551                 rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err);
2552                 goto out;
2553         }
2554         tx_iu = NULL;
2555
2556         /* Wait for state change */
2557         wait_event_interruptible_timeout(sess->state_wq,
2558                                          sess->state != RTRS_CLT_CONNECTING,
2559                                          msecs_to_jiffies(
2560                                                  RTRS_CONNECT_TIMEOUT_MS));
2561         if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED) {
2562                 if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR)
2563                         err = -ECONNRESET;
2564                 else
2565                         err = -ETIMEDOUT;
2566         }
2567
2568 out:
2569         if (tx_iu)
2570                 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1);
2571         if (rx_iu)
2572                 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1);
2573         if (err)
2574                 /* If we've never taken async path because of malloc problems */
2575                 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2576
2577         return err;
2578 }
2579
2580 /**
2581  * init_sess() - establishes all session connections and does handshake
2582  * @sess: client session.
2583  * In case of error full close or reconnect procedure should be taken,
2584  * because reconnect or close async works can be started.
2585  */
2586 static int init_sess(struct rtrs_clt_sess *sess)
2587 {
2588         int err;
2589         char str[NAME_MAX];
2590         struct rtrs_addr path = {
2591                 .src = &sess->s.src_addr,
2592                 .dst = &sess->s.dst_addr,
2593         };
2594
2595         rtrs_addr_to_str(&path, str, sizeof(str));
2596
2597         mutex_lock(&sess->init_mutex);
2598         err = init_conns(sess);
2599         if (err) {
2600                 rtrs_err(sess->clt,
2601                          "init_conns() failed: err=%d path=%s [%s:%u]\n", err,
2602                          str, sess->hca_name, sess->hca_port);
2603                 goto out;
2604         }
2605         err = rtrs_send_sess_info(sess);
2606         if (err) {
2607                 rtrs_err(
2608                         sess->clt,
2609                         "rtrs_send_sess_info() failed: err=%d path=%s [%s:%u]\n",
2610                         err, str, sess->hca_name, sess->hca_port);
2611                 goto out;
2612         }
2613         rtrs_clt_sess_up(sess);
2614 out:
2615         mutex_unlock(&sess->init_mutex);
2616
2617         return err;
2618 }
2619
2620 static void rtrs_clt_reconnect_work(struct work_struct *work)
2621 {
2622         struct rtrs_clt_sess *sess;
2623         struct rtrs_clt *clt;
2624         unsigned int delay_ms;
2625         int err;
2626
2627         sess = container_of(to_delayed_work(work), struct rtrs_clt_sess,
2628                             reconnect_dwork);
2629         clt = sess->clt;
2630
2631         if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING)
2632                 return;
2633
2634         if (sess->reconnect_attempts >= clt->max_reconnect_attempts) {
2635                 /* Close a session completely if max attempts is reached */
2636                 rtrs_clt_close_conns(sess, false);
2637                 return;
2638         }
2639         sess->reconnect_attempts++;
2640
2641         /* Stop everything */
2642         rtrs_clt_stop_and_destroy_conns(sess);
2643         msleep(RTRS_RECONNECT_BACKOFF);
2644         if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING, NULL)) {
2645                 err = init_sess(sess);
2646                 if (err)
2647                         goto reconnect_again;
2648         }
2649
2650         return;
2651
2652 reconnect_again:
2653         if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING, NULL)) {
2654                 sess->stats->reconnects.fail_cnt++;
2655                 delay_ms = clt->reconnect_delay_sec * 1000;
2656                 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
2657                                    msecs_to_jiffies(delay_ms +
2658                                                     prandom_u32() %
2659                                                     RTRS_RECONNECT_SEED));
2660         }
2661 }
2662
2663 static void rtrs_clt_dev_release(struct device *dev)
2664 {
2665         struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev);
2666
2667         kfree(clt);
2668 }
2669
2670 static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num,
2671                                   u16 port, size_t pdu_sz, void *priv,
2672                                   void  (*link_ev)(void *priv,
2673                                                    enum rtrs_clt_link_ev ev),
2674                                   unsigned int reconnect_delay_sec,
2675                                   unsigned int max_reconnect_attempts)
2676 {
2677         struct rtrs_clt *clt;
2678         int err;
2679
2680         if (!paths_num || paths_num > MAX_PATHS_NUM)
2681                 return ERR_PTR(-EINVAL);
2682
2683         if (strlen(sessname) >= sizeof(clt->sessname))
2684                 return ERR_PTR(-EINVAL);
2685
2686         clt = kzalloc(sizeof(*clt), GFP_KERNEL);
2687         if (!clt)
2688                 return ERR_PTR(-ENOMEM);
2689
2690         clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path));
2691         if (!clt->pcpu_path) {
2692                 kfree(clt);
2693                 return ERR_PTR(-ENOMEM);
2694         }
2695
2696         uuid_gen(&clt->paths_uuid);
2697         INIT_LIST_HEAD_RCU(&clt->paths_list);
2698         clt->paths_num = paths_num;
2699         clt->paths_up = MAX_PATHS_NUM;
2700         clt->port = port;
2701         clt->pdu_sz = pdu_sz;
2702         clt->max_segments = RTRS_MAX_SEGMENTS;
2703         clt->reconnect_delay_sec = reconnect_delay_sec;
2704         clt->max_reconnect_attempts = max_reconnect_attempts;
2705         clt->priv = priv;
2706         clt->link_ev = link_ev;
2707         clt->mp_policy = MP_POLICY_MIN_INFLIGHT;
2708         strscpy(clt->sessname, sessname, sizeof(clt->sessname));
2709         init_waitqueue_head(&clt->permits_wait);
2710         mutex_init(&clt->paths_ev_mutex);
2711         mutex_init(&clt->paths_mutex);
2712
2713         clt->dev.class = rtrs_clt_dev_class;
2714         clt->dev.release = rtrs_clt_dev_release;
2715         err = dev_set_name(&clt->dev, "%s", sessname);
2716         if (err)
2717                 goto err;
2718         /*
2719          * Suppress user space notification until
2720          * sysfs files are created
2721          */
2722         dev_set_uevent_suppress(&clt->dev, true);
2723         err = device_register(&clt->dev);
2724         if (err) {
2725                 put_device(&clt->dev);
2726                 goto err;
2727         }
2728
2729         clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj);
2730         if (!clt->kobj_paths) {
2731                 err = -ENOMEM;
2732                 goto err_dev;
2733         }
2734         err = rtrs_clt_create_sysfs_root_files(clt);
2735         if (err) {
2736                 kobject_del(clt->kobj_paths);
2737                 kobject_put(clt->kobj_paths);
2738                 goto err_dev;
2739         }
2740         dev_set_uevent_suppress(&clt->dev, false);
2741         kobject_uevent(&clt->dev.kobj, KOBJ_ADD);
2742
2743         return clt;
2744 err_dev:
2745         device_unregister(&clt->dev);
2746 err:
2747         free_percpu(clt->pcpu_path);
2748         kfree(clt);
2749         return ERR_PTR(err);
2750 }
2751
2752 static void free_clt(struct rtrs_clt *clt)
2753 {
2754         free_permits(clt);
2755         free_percpu(clt->pcpu_path);
2756         mutex_destroy(&clt->paths_ev_mutex);
2757         mutex_destroy(&clt->paths_mutex);
2758         /* release callback will free clt in last put */
2759         device_unregister(&clt->dev);
2760 }
2761
2762 /**
2763  * rtrs_clt_open() - Open a session to an RTRS server
2764  * @ops: holds the link event callback and the private pointer.
2765  * @sessname: name of the session
2766  * @paths: Paths to be established defined by their src and dst addresses
2767  * @paths_num: Number of elements in the @paths array
2768  * @port: port to be used by the RTRS session
2769  * @pdu_sz: Size of extra payload which can be accessed after permit allocation.
2770  * @reconnect_delay_sec: time between reconnect tries
2771  * @max_reconnect_attempts: Number of times to reconnect on error before giving
2772  *                          up, 0 for * disabled, -1 for forever
2773  * @nr_poll_queues: number of polling mode connection using IB_POLL_DIRECT flag
2774  *
2775  * Starts session establishment with the rtrs_server. The function can block
2776  * up to ~2000ms before it returns.
2777  *
2778  * Return a valid pointer on success otherwise PTR_ERR.
2779  */
2780 struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops,
2781                                  const char *sessname,
2782                                  const struct rtrs_addr *paths,
2783                                  size_t paths_num, u16 port,
2784                                  size_t pdu_sz, u8 reconnect_delay_sec,
2785                                  s16 max_reconnect_attempts, u32 nr_poll_queues)
2786 {
2787         struct rtrs_clt_sess *sess, *tmp;
2788         struct rtrs_clt *clt;
2789         int err, i;
2790
2791         if (strchr(sessname, '/') || strchr(sessname, '.')) {
2792                 pr_err("sessname cannot contain / and .\n");
2793                 err = -EINVAL;
2794                 goto out;
2795         }
2796
2797         clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv,
2798                         ops->link_ev,
2799                         reconnect_delay_sec,
2800                         max_reconnect_attempts);
2801         if (IS_ERR(clt)) {
2802                 err = PTR_ERR(clt);
2803                 goto out;
2804         }
2805         for (i = 0; i < paths_num; i++) {
2806                 struct rtrs_clt_sess *sess;
2807
2808                 sess = alloc_sess(clt, &paths[i], nr_cpu_ids,
2809                                   nr_poll_queues);
2810                 if (IS_ERR(sess)) {
2811                         err = PTR_ERR(sess);
2812                         goto close_all_sess;
2813                 }
2814                 if (!i)
2815                         sess->for_new_clt = 1;
2816                 list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2817
2818                 err = init_sess(sess);
2819                 if (err) {
2820                         list_del_rcu(&sess->s.entry);
2821                         rtrs_clt_close_conns(sess, true);
2822                         free_percpu(sess->stats->pcpu_stats);
2823                         kfree(sess->stats);
2824                         free_sess(sess);
2825                         goto close_all_sess;
2826                 }
2827
2828                 err = rtrs_clt_create_sess_files(sess);
2829                 if (err) {
2830                         list_del_rcu(&sess->s.entry);
2831                         rtrs_clt_close_conns(sess, true);
2832                         free_percpu(sess->stats->pcpu_stats);
2833                         kfree(sess->stats);
2834                         free_sess(sess);
2835                         goto close_all_sess;
2836                 }
2837         }
2838         err = alloc_permits(clt);
2839         if (err)
2840                 goto close_all_sess;
2841
2842         return clt;
2843
2844 close_all_sess:
2845         list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2846                 rtrs_clt_destroy_sess_files(sess, NULL);
2847                 rtrs_clt_close_conns(sess, true);
2848                 kobject_put(&sess->kobj);
2849         }
2850         rtrs_clt_destroy_sysfs_root(clt);
2851         free_clt(clt);
2852
2853 out:
2854         return ERR_PTR(err);
2855 }
2856 EXPORT_SYMBOL(rtrs_clt_open);
2857
2858 /**
2859  * rtrs_clt_close() - Close a session
2860  * @clt: Session handle. Session is freed upon return.
2861  */
2862 void rtrs_clt_close(struct rtrs_clt *clt)
2863 {
2864         struct rtrs_clt_sess *sess, *tmp;
2865
2866         /* Firstly forbid sysfs access */
2867         rtrs_clt_destroy_sysfs_root(clt);
2868
2869         /* Now it is safe to iterate over all paths without locks */
2870         list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2871                 rtrs_clt_close_conns(sess, true);
2872                 rtrs_clt_destroy_sess_files(sess, NULL);
2873                 kobject_put(&sess->kobj);
2874         }
2875         free_clt(clt);
2876 }
2877 EXPORT_SYMBOL(rtrs_clt_close);
2878
2879 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess)
2880 {
2881         enum rtrs_clt_state old_state;
2882         int err = -EBUSY;
2883         bool changed;
2884
2885         changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING,
2886                                                  &old_state);
2887         if (changed) {
2888                 sess->reconnect_attempts = 0;
2889                 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0);
2890         }
2891         if (changed || old_state == RTRS_CLT_RECONNECTING) {
2892                 /*
2893                  * flush_delayed_work() queues pending work for immediate
2894                  * execution, so do the flush if we have queued something
2895                  * right now or work is pending.
2896                  */
2897                 flush_delayed_work(&sess->reconnect_dwork);
2898                 err = (READ_ONCE(sess->state) ==
2899                        RTRS_CLT_CONNECTED ? 0 : -ENOTCONN);
2900         }
2901
2902         return err;
2903 }
2904
2905 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess,
2906                                      const struct attribute *sysfs_self)
2907 {
2908         enum rtrs_clt_state old_state;
2909         bool changed;
2910
2911         /*
2912          * Continue stopping path till state was changed to DEAD or
2913          * state was observed as DEAD:
2914          * 1. State was changed to DEAD - we were fast and nobody
2915          *    invoked rtrs_clt_reconnect(), which can again start
2916          *    reconnecting.
2917          * 2. State was observed as DEAD - we have someone in parallel
2918          *    removing the path.
2919          */
2920         do {
2921                 rtrs_clt_close_conns(sess, true);
2922                 changed = rtrs_clt_change_state_get_old(sess,
2923                                                         RTRS_CLT_DEAD,
2924                                                         &old_state);
2925         } while (!changed && old_state != RTRS_CLT_DEAD);
2926
2927         if (changed) {
2928                 rtrs_clt_remove_path_from_arr(sess);
2929                 rtrs_clt_destroy_sess_files(sess, sysfs_self);
2930                 kobject_put(&sess->kobj);
2931         }
2932
2933         return 0;
2934 }
2935
2936 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value)
2937 {
2938         clt->max_reconnect_attempts = (unsigned int)value;
2939 }
2940
2941 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt)
2942 {
2943         return (int)clt->max_reconnect_attempts;
2944 }
2945
2946 /**
2947  * rtrs_clt_request() - Request data transfer to/from server via RDMA.
2948  *
2949  * @dir:        READ/WRITE
2950  * @ops:        callback function to be called as confirmation, and the pointer.
2951  * @clt:        Session
2952  * @permit:     Preallocated permit
2953  * @vec:        Message that is sent to server together with the request.
2954  *              Sum of len of all @vec elements limited to <= IO_MSG_SIZE.
2955  *              Since the msg is copied internally it can be allocated on stack.
2956  * @nr:         Number of elements in @vec.
2957  * @data_len:   length of data sent to/from server
2958  * @sg:         Pages to be sent/received to/from server.
2959  * @sg_cnt:     Number of elements in the @sg
2960  *
2961  * Return:
2962  * 0:           Success
2963  * <0:          Error
2964  *
2965  * On dir=READ rtrs client will request a data transfer from Server to client.
2966  * The data that the server will respond with will be stored in @sg when
2967  * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event.
2968  * On dir=WRITE rtrs client will rdma write data in sg to server side.
2969  */
2970 int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops,
2971                      struct rtrs_clt *clt, struct rtrs_permit *permit,
2972                       const struct kvec *vec, size_t nr, size_t data_len,
2973                       struct scatterlist *sg, unsigned int sg_cnt)
2974 {
2975         struct rtrs_clt_io_req *req;
2976         struct rtrs_clt_sess *sess;
2977
2978         enum dma_data_direction dma_dir;
2979         int err = -ECONNABORTED, i;
2980         size_t usr_len, hdr_len;
2981         struct path_it it;
2982
2983         /* Get kvec length */
2984         for (i = 0, usr_len = 0; i < nr; i++)
2985                 usr_len += vec[i].iov_len;
2986
2987         if (dir == READ) {
2988                 hdr_len = sizeof(struct rtrs_msg_rdma_read) +
2989                           sg_cnt * sizeof(struct rtrs_sg_desc);
2990                 dma_dir = DMA_FROM_DEVICE;
2991         } else {
2992                 hdr_len = sizeof(struct rtrs_msg_rdma_write);
2993                 dma_dir = DMA_TO_DEVICE;
2994         }
2995
2996         rcu_read_lock();
2997         for (path_it_init(&it, clt);
2998              (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
2999                 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
3000                         continue;
3001
3002                 if (usr_len + hdr_len > sess->max_hdr_size) {
3003                         rtrs_wrn_rl(sess->clt,
3004                                      "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
3005                                      dir == READ ? "Read" : "Write",
3006                                      usr_len, hdr_len, sess->max_hdr_size);
3007                         err = -EMSGSIZE;
3008                         break;
3009                 }
3010                 req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv,
3011                                        vec, usr_len, sg, sg_cnt, data_len,
3012                                        dma_dir);
3013                 if (dir == READ)
3014                         err = rtrs_clt_read_req(req);
3015                 else
3016                         err = rtrs_clt_write_req(req);
3017                 if (err) {
3018                         req->in_use = false;
3019                         continue;
3020                 }
3021                 /* Success path */
3022                 break;
3023         }
3024         path_it_deinit(&it);
3025         rcu_read_unlock();
3026
3027         return err;
3028 }
3029 EXPORT_SYMBOL(rtrs_clt_request);
3030
3031 int rtrs_clt_rdma_cq_direct(struct rtrs_clt *clt, unsigned int index)
3032 {
3033         /* If no path, return -1 for block layer not to try again */
3034         int cnt = -1;
3035         struct rtrs_con *con;
3036         struct rtrs_clt_sess *sess;
3037         struct path_it it;
3038
3039         rcu_read_lock();
3040         for (path_it_init(&it, clt);
3041              (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
3042                 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
3043                         continue;
3044
3045                 con = sess->s.con[index + 1];
3046                 cnt = ib_process_cq_direct(con->cq, -1);
3047                 if (cnt)
3048                         break;
3049         }
3050         path_it_deinit(&it);
3051         rcu_read_unlock();
3052
3053         return cnt;
3054 }
3055 EXPORT_SYMBOL(rtrs_clt_rdma_cq_direct);
3056
3057 /**
3058  * rtrs_clt_query() - queries RTRS session attributes
3059  *@clt: session pointer
3060  *@attr: query results for session attributes.
3061  * Returns:
3062  *    0 on success
3063  *    -ECOMM            no connection to the server
3064  */
3065 int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr)
3066 {
3067         if (!rtrs_clt_is_connected(clt))
3068                 return -ECOMM;
3069
3070         attr->queue_depth      = clt->queue_depth;
3071         attr->max_segments     = clt->max_segments;
3072         /* Cap max_io_size to min of remote buffer size and the fr pages */
3073         attr->max_io_size = min_t(int, clt->max_io_size,
3074                                   clt->max_segments * SZ_4K);
3075
3076         return 0;
3077 }
3078 EXPORT_SYMBOL(rtrs_clt_query);
3079
3080 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt,
3081                                      struct rtrs_addr *addr)
3082 {
3083         struct rtrs_clt_sess *sess;
3084         int err;
3085
3086         sess = alloc_sess(clt, addr, nr_cpu_ids, 0);
3087         if (IS_ERR(sess))
3088                 return PTR_ERR(sess);
3089
3090         mutex_lock(&clt->paths_mutex);
3091         if (clt->paths_num == 0) {
3092                 /*
3093                  * When all the paths are removed for a session,
3094                  * the addition of the first path is like a new session for
3095                  * the storage server
3096                  */
3097                 sess->for_new_clt = 1;
3098         }
3099
3100         mutex_unlock(&clt->paths_mutex);
3101
3102         /*
3103          * It is totally safe to add path in CONNECTING state: coming
3104          * IO will never grab it.  Also it is very important to add
3105          * path before init, since init fires LINK_CONNECTED event.
3106          */
3107         rtrs_clt_add_path_to_arr(sess);
3108
3109         err = init_sess(sess);
3110         if (err)
3111                 goto close_sess;
3112
3113         err = rtrs_clt_create_sess_files(sess);
3114         if (err)
3115                 goto close_sess;
3116
3117         return 0;
3118
3119 close_sess:
3120         rtrs_clt_remove_path_from_arr(sess);
3121         rtrs_clt_close_conns(sess, true);
3122         free_percpu(sess->stats->pcpu_stats);
3123         kfree(sess->stats);
3124         free_sess(sess);
3125
3126         return err;
3127 }
3128
3129 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev)
3130 {
3131         if (!(dev->ib_dev->attrs.device_cap_flags &
3132               IB_DEVICE_MEM_MGT_EXTENSIONS)) {
3133                 pr_err("Memory registrations not supported.\n");
3134                 return -ENOTSUPP;
3135         }
3136
3137         return 0;
3138 }
3139
3140 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = {
3141         .init = rtrs_clt_ib_dev_init
3142 };
3143
3144 static int __init rtrs_client_init(void)
3145 {
3146         rtrs_rdma_dev_pd_init(0, &dev_pd);
3147
3148         rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client");
3149         if (IS_ERR(rtrs_clt_dev_class)) {
3150                 pr_err("Failed to create rtrs-client dev class\n");
3151                 return PTR_ERR(rtrs_clt_dev_class);
3152         }
3153         rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0);
3154         if (!rtrs_wq) {
3155                 class_destroy(rtrs_clt_dev_class);
3156                 return -ENOMEM;
3157         }
3158
3159         return 0;
3160 }
3161
3162 static void __exit rtrs_client_exit(void)
3163 {
3164         destroy_workqueue(rtrs_wq);
3165         class_destroy(rtrs_clt_dev_class);
3166         rtrs_rdma_dev_pd_deinit(&dev_pd);
3167 }
3168
3169 module_init(rtrs_client_init);
3170 module_exit(rtrs_client_exit);