Merge tag 'timers-urgent-2020-08-14' of git://git.kernel.org/pub/scm/linux/kernel...
[linux-2.6-microblaze.git] / drivers / misc / mic / scif / scif_nodeqp.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Intel MIC Platform Software Stack (MPSS)
4  *
5  * Copyright(c) 2014 Intel Corporation.
6  *
7  * Intel SCIF driver.
8  */
9 #include "../bus/scif_bus.h"
10 #include "scif_peer_bus.h"
11 #include "scif_main.h"
12 #include "scif_nodeqp.h"
13 #include "scif_map.h"
14
15 /*
16  ************************************************************************
17  * SCIF node Queue Pair (QP) setup flow:
18  *
19  * 1) SCIF driver gets probed with a scif_hw_dev via the scif_hw_bus
20  * 2) scif_setup_qp(..) allocates the local qp and calls
21  *      scif_setup_qp_connect(..) which allocates and maps the local
22  *      buffer for the inbound QP
23  * 3) The local node updates the device page with the DMA address of the QP
24  * 4) A delayed work is scheduled (qp_dwork) which periodically reads if
25  *      the peer node has updated its QP DMA address
26  * 5) Once a valid non zero address is found in the QP DMA address field
27  *      in the device page, the local node maps the remote node's QP,
28  *      updates its outbound QP and sends a SCIF_INIT message to the peer
29  * 6) The SCIF_INIT message is received by the peer node QP interrupt bottom
30  *      half handler by calling scif_init(..)
31  * 7) scif_init(..) registers a new SCIF peer node by calling
32  *      scif_peer_register_device(..) which signifies the addition of a new
33  *      SCIF node
34  * 8) On the mgmt node, P2P network setup/teardown is initiated if all the
35  *      remote nodes are online via scif_p2p_setup(..)
36  * 9) For P2P setup, the host maps the remote nodes' aperture and memory
37  *      bars and sends a SCIF_NODE_ADD message to both nodes
38  * 10) As part of scif_nodeadd, both nodes set up their local inbound
39  *      QPs and send a SCIF_NODE_ADD_ACK to the mgmt node
40  * 11) As part of scif_node_add_ack(..) the mgmt node forwards the
41  *      SCIF_NODE_ADD_ACK to the remote nodes
42  * 12) As part of scif_node_add_ack(..) the remote nodes update their
43  *      outbound QPs, make sure they can access memory on the remote node
44  *      and then add a new SCIF peer node by calling
45  *      scif_peer_register_device(..) which signifies the addition of a new
46  *      SCIF node.
47  * 13) The SCIF network is now established across all nodes.
48  *
49  ************************************************************************
50  * SCIF node QP teardown flow (initiated by non mgmt node):
51  *
52  * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
53  * 2) The device page QP DMA address field is updated with 0x0
54  * 3) A non mgmt node now cleans up all local data structures and sends a
55  *      SCIF_EXIT message to the peer and waits for a SCIF_EXIT_ACK
56  * 4) As part of scif_exit(..) handling scif_disconnect_node(..) is called
57  * 5) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the
58  *      peers and waits for a SCIF_NODE_REMOVE_ACK
59  * 6) As part of scif_node_remove(..) a remote node unregisters the peer
60  *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
61  * 7) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
62  *      it sends itself a node remove message whose handling cleans up local
63  *      data structures and unregisters the peer node from the SCIF network
64  * 8) The mgmt node sends a SCIF_EXIT_ACK
65  * 9) Upon receipt of the SCIF_EXIT_ACK the node initiating the teardown
66  *      completes the SCIF remove routine
67  * 10) The SCIF network is now torn down for the node initiating the
68  *      teardown sequence
69  *
70  ************************************************************************
71  * SCIF node QP teardown flow (initiated by mgmt node):
72  *
73  * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
74  * 2) The device page QP DMA address field is updated with 0x0
75  * 3) The mgmt node calls scif_disconnect_node(..)
76  * 4) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the peers
77  *      and waits for a SCIF_NODE_REMOVE_ACK
78  * 5) As part of scif_node_remove(..) a remote node unregisters the peer
79  *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
80  * 6) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
81  *      it unregisters the peer node from the SCIF network
82  * 7) The mgmt node sends a SCIF_EXIT message and waits for a SCIF_EXIT_ACK.
83  * 8) A non mgmt node upon receipt of a SCIF_EXIT message calls scif_stop(..)
84  *      which would clean up local data structures for all SCIF nodes and
85  *      then send a SCIF_EXIT_ACK back to the mgmt node
86  * 9) Upon receipt of the SCIF_EXIT_ACK the the mgmt node sends itself a node
87  *      remove message whose handling cleans up local data structures and
88  *      destroys any P2P mappings.
89  * 10) The SCIF hardware device for which a remove callback was received is now
90  *      disconnected from the SCIF network.
91  */
92 /*
93  * Initializes "local" data structures for the QP. Allocates the QP
94  * ring buffer (rb) and initializes the "in bound" queue.
95  */
96 int scif_setup_qp_connect(struct scif_qp *qp, dma_addr_t *qp_offset,
97                           int local_size, struct scif_dev *scifdev)
98 {
99         void *local_q = qp->inbound_q.rb_base;
100         int err = 0;
101         u32 tmp_rd = 0;
102
103         spin_lock_init(&qp->send_lock);
104         spin_lock_init(&qp->recv_lock);
105
106         /* Allocate rb only if not already allocated */
107         if (!local_q) {
108                 local_q = kzalloc(local_size, GFP_KERNEL);
109                 if (!local_q) {
110                         err = -ENOMEM;
111                         return err;
112                 }
113         }
114
115         err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
116         if (err)
117                 goto kfree;
118         /*
119          * To setup the inbound_q, the buffer lives locally, the read pointer
120          * is remote and the write pointer is local.
121          */
122         scif_rb_init(&qp->inbound_q,
123                      &tmp_rd,
124                      &qp->local_write,
125                      local_q, get_count_order(local_size));
126         /*
127          * The read pointer is NULL initially and it is unsafe to use the ring
128          * buffer til this changes!
129          */
130         qp->inbound_q.read_ptr = NULL;
131         err = scif_map_single(qp_offset, qp,
132                               scifdev, sizeof(struct scif_qp));
133         if (err)
134                 goto unmap;
135         qp->local_qp = *qp_offset;
136         return err;
137 unmap:
138         scif_unmap_single(qp->local_buf, scifdev, local_size);
139         qp->local_buf = 0;
140 kfree:
141         kfree(local_q);
142         return err;
143 }
144
145 /* When the other side has already done it's allocation, this is called */
146 int scif_setup_qp_accept(struct scif_qp *qp, dma_addr_t *qp_offset,
147                          dma_addr_t phys, int local_size,
148                          struct scif_dev *scifdev)
149 {
150         void *local_q;
151         void *remote_q;
152         struct scif_qp *remote_qp;
153         int remote_size;
154         int err = 0;
155
156         spin_lock_init(&qp->send_lock);
157         spin_lock_init(&qp->recv_lock);
158         /* Start by figuring out where we need to point */
159         remote_qp = scif_ioremap(phys, sizeof(struct scif_qp), scifdev);
160         if (!remote_qp)
161                 return -EIO;
162         qp->remote_qp = remote_qp;
163         if (qp->remote_qp->magic != SCIFEP_MAGIC) {
164                 err = -EIO;
165                 goto iounmap;
166         }
167         qp->remote_buf = remote_qp->local_buf;
168         remote_size = qp->remote_qp->inbound_q.size;
169         remote_q = scif_ioremap(qp->remote_buf, remote_size, scifdev);
170         if (!remote_q) {
171                 err = -EIO;
172                 goto iounmap;
173         }
174         qp->remote_qp->local_write = 0;
175         /*
176          * To setup the outbound_q, the buffer lives in remote memory,
177          * the read pointer is local, the write pointer is remote
178          */
179         scif_rb_init(&qp->outbound_q,
180                      &qp->local_read,
181                      &qp->remote_qp->local_write,
182                      remote_q,
183                      get_count_order(remote_size));
184         local_q = kzalloc(local_size, GFP_KERNEL);
185         if (!local_q) {
186                 err = -ENOMEM;
187                 goto iounmap_1;
188         }
189         err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
190         if (err)
191                 goto kfree;
192         qp->remote_qp->local_read = 0;
193         /*
194          * To setup the inbound_q, the buffer lives locally, the read pointer
195          * is remote and the write pointer is local
196          */
197         scif_rb_init(&qp->inbound_q,
198                      &qp->remote_qp->local_read,
199                      &qp->local_write,
200                      local_q, get_count_order(local_size));
201         err = scif_map_single(qp_offset, qp, scifdev,
202                               sizeof(struct scif_qp));
203         if (err)
204                 goto unmap;
205         qp->local_qp = *qp_offset;
206         return err;
207 unmap:
208         scif_unmap_single(qp->local_buf, scifdev, local_size);
209         qp->local_buf = 0;
210 kfree:
211         kfree(local_q);
212 iounmap_1:
213         scif_iounmap(remote_q, remote_size, scifdev);
214         qp->outbound_q.rb_base = NULL;
215 iounmap:
216         scif_iounmap(qp->remote_qp, sizeof(struct scif_qp), scifdev);
217         qp->remote_qp = NULL;
218         return err;
219 }
220
221 int scif_setup_qp_connect_response(struct scif_dev *scifdev,
222                                    struct scif_qp *qp, u64 payload)
223 {
224         int err = 0;
225         void *r_buf;
226         int remote_size;
227         phys_addr_t tmp_phys;
228
229         qp->remote_qp = scif_ioremap(payload, sizeof(struct scif_qp), scifdev);
230
231         if (!qp->remote_qp) {
232                 err = -ENOMEM;
233                 goto error;
234         }
235
236         if (qp->remote_qp->magic != SCIFEP_MAGIC) {
237                 dev_err(&scifdev->sdev->dev,
238                         "SCIFEP_MAGIC mismatch between self %d remote %d\n",
239                         scif_dev[scif_info.nodeid].node, scifdev->node);
240                 err = -ENODEV;
241                 goto error;
242         }
243
244         tmp_phys = qp->remote_qp->local_buf;
245         remote_size = qp->remote_qp->inbound_q.size;
246         r_buf = scif_ioremap(tmp_phys, remote_size, scifdev);
247
248         if (!r_buf)
249                 return -EIO;
250
251         qp->local_read = 0;
252         scif_rb_init(&qp->outbound_q,
253                      &qp->local_read,
254                      &qp->remote_qp->local_write,
255                      r_buf,
256                      get_count_order(remote_size));
257         /*
258          * Because the node QP may already be processing an INIT message, set
259          * the read pointer so the cached read offset isn't lost
260          */
261         qp->remote_qp->local_read = qp->inbound_q.current_read_offset;
262         /*
263          * resetup the inbound_q now that we know where the
264          * inbound_read really is.
265          */
266         scif_rb_init(&qp->inbound_q,
267                      &qp->remote_qp->local_read,
268                      &qp->local_write,
269                      qp->inbound_q.rb_base,
270                      get_count_order(qp->inbound_q.size));
271 error:
272         return err;
273 }
274
275 static __always_inline void
276 scif_send_msg_intr(struct scif_dev *scifdev)
277 {
278         struct scif_hw_dev *sdev = scifdev->sdev;
279
280         if (scifdev_is_p2p(scifdev))
281                 sdev->hw_ops->send_p2p_intr(sdev, scifdev->rdb, &scifdev->mmio);
282         else
283                 sdev->hw_ops->send_intr(sdev, scifdev->rdb);
284 }
285
286 int scif_qp_response(phys_addr_t phys, struct scif_dev *scifdev)
287 {
288         int err = 0;
289         struct scifmsg msg;
290
291         err = scif_setup_qp_connect_response(scifdev, scifdev->qpairs, phys);
292         if (!err) {
293                 /*
294                  * Now that everything is setup and mapped, we're ready
295                  * to tell the peer about our queue's location
296                  */
297                 msg.uop = SCIF_INIT;
298                 msg.dst.node = scifdev->node;
299                 err = scif_nodeqp_send(scifdev, &msg);
300         }
301         return err;
302 }
303
304 void scif_send_exit(struct scif_dev *scifdev)
305 {
306         struct scifmsg msg;
307         int ret;
308
309         scifdev->exit = OP_IN_PROGRESS;
310         msg.uop = SCIF_EXIT;
311         msg.src.node = scif_info.nodeid;
312         msg.dst.node = scifdev->node;
313         ret = scif_nodeqp_send(scifdev, &msg);
314         if (ret)
315                 goto done;
316         /* Wait for a SCIF_EXIT_ACK message */
317         wait_event_timeout(scif_info.exitwq, scifdev->exit == OP_COMPLETED,
318                            SCIF_NODE_ALIVE_TIMEOUT);
319 done:
320         scifdev->exit = OP_IDLE;
321 }
322
323 int scif_setup_qp(struct scif_dev *scifdev)
324 {
325         int err = 0;
326         int local_size;
327         struct scif_qp *qp;
328
329         local_size = SCIF_NODE_QP_SIZE;
330
331         qp = kzalloc(sizeof(*qp), GFP_KERNEL);
332         if (!qp) {
333                 err = -ENOMEM;
334                 return err;
335         }
336         qp->magic = SCIFEP_MAGIC;
337         scifdev->qpairs = qp;
338         err = scif_setup_qp_connect(qp, &scifdev->qp_dma_addr,
339                                     local_size, scifdev);
340         if (err)
341                 goto free_qp;
342         /*
343          * We're as setup as we can be. The inbound_q is setup, w/o a usable
344          * outbound q.  When we get a message, the read_ptr will be updated,
345          * and we will pull the message.
346          */
347         return err;
348 free_qp:
349         kfree(scifdev->qpairs);
350         scifdev->qpairs = NULL;
351         return err;
352 }
353
354 static void scif_p2p_freesg(struct scatterlist *sg)
355 {
356         kfree(sg);
357 }
358
359 static struct scatterlist *
360 scif_p2p_setsg(phys_addr_t pa, int page_size, int page_cnt)
361 {
362         struct scatterlist *sg;
363         struct page *page;
364         int i;
365
366         sg = kcalloc(page_cnt, sizeof(struct scatterlist), GFP_KERNEL);
367         if (!sg)
368                 return NULL;
369         sg_init_table(sg, page_cnt);
370         for (i = 0; i < page_cnt; i++) {
371                 page = pfn_to_page(pa >> PAGE_SHIFT);
372                 sg_set_page(&sg[i], page, page_size, 0);
373                 pa += page_size;
374         }
375         return sg;
376 }
377
378 /* Init p2p mappings required to access peerdev from scifdev */
379 static struct scif_p2p_info *
380 scif_init_p2p_info(struct scif_dev *scifdev, struct scif_dev *peerdev)
381 {
382         struct scif_p2p_info *p2p;
383         int num_mmio_pages, num_aper_pages, sg_page_shift, err, num_aper_chunks;
384         struct scif_hw_dev *psdev = peerdev->sdev;
385         struct scif_hw_dev *sdev = scifdev->sdev;
386
387         num_mmio_pages = psdev->mmio->len >> PAGE_SHIFT;
388         num_aper_pages = psdev->aper->len >> PAGE_SHIFT;
389
390         p2p = kzalloc(sizeof(*p2p), GFP_KERNEL);
391         if (!p2p)
392                 return NULL;
393         p2p->ppi_sg[SCIF_PPI_MMIO] = scif_p2p_setsg(psdev->mmio->pa,
394                                                     PAGE_SIZE, num_mmio_pages);
395         if (!p2p->ppi_sg[SCIF_PPI_MMIO])
396                 goto free_p2p;
397         p2p->sg_nentries[SCIF_PPI_MMIO] = num_mmio_pages;
398         sg_page_shift = get_order(min(psdev->aper->len, (u64)(1 << 30)));
399         num_aper_chunks = num_aper_pages >> (sg_page_shift - PAGE_SHIFT);
400         p2p->ppi_sg[SCIF_PPI_APER] = scif_p2p_setsg(psdev->aper->pa,
401                                                     1 << sg_page_shift,
402                                                     num_aper_chunks);
403         p2p->sg_nentries[SCIF_PPI_APER] = num_aper_chunks;
404         err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
405                          num_mmio_pages, PCI_DMA_BIDIRECTIONAL);
406         if (err != num_mmio_pages)
407                 goto scif_p2p_free;
408         err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
409                          num_aper_chunks, PCI_DMA_BIDIRECTIONAL);
410         if (err != num_aper_chunks)
411                 goto dma_unmap;
412         p2p->ppi_da[SCIF_PPI_MMIO] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_MMIO]);
413         p2p->ppi_da[SCIF_PPI_APER] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_APER]);
414         p2p->ppi_len[SCIF_PPI_MMIO] = num_mmio_pages;
415         p2p->ppi_len[SCIF_PPI_APER] = num_aper_pages;
416         p2p->ppi_peer_id = peerdev->node;
417         return p2p;
418 dma_unmap:
419         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
420                      p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
421 scif_p2p_free:
422         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
423         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
424 free_p2p:
425         kfree(p2p);
426         return NULL;
427 }
428
429 /* Uninitialize and release resources from a p2p mapping */
430 static void scif_deinit_p2p_info(struct scif_dev *scifdev,
431                                  struct scif_p2p_info *p2p)
432 {
433         struct scif_hw_dev *sdev = scifdev->sdev;
434
435         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
436                      p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
437         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
438                      p2p->sg_nentries[SCIF_PPI_APER], DMA_BIDIRECTIONAL);
439         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
440         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
441         kfree(p2p);
442 }
443
444 /**
445  * scif_node_connect: Respond to SCIF_NODE_CONNECT interrupt message
446  * @scifdev: SCIF device
447  * @dst: Destination node
448  *
449  * Connect the src and dst node by setting up the p2p connection
450  * between them. Management node here acts like a proxy.
451  */
452 static void scif_node_connect(struct scif_dev *scifdev, int dst)
453 {
454         struct scif_dev *dev_j = scifdev;
455         struct scif_dev *dev_i = NULL;
456         struct scif_p2p_info *p2p_ij = NULL;    /* bus addr for j from i */
457         struct scif_p2p_info *p2p_ji = NULL;    /* bus addr for i from j */
458         struct scif_p2p_info *p2p;
459         struct list_head *pos, *tmp;
460         struct scifmsg msg;
461         int err;
462         u64 tmppayload;
463
464         if (dst < 1 || dst > scif_info.maxid)
465                 return;
466
467         dev_i = &scif_dev[dst];
468
469         if (!_scifdev_alive(dev_i))
470                 return;
471         /*
472          * If the p2p connection is already setup or in the process of setting
473          * up then just ignore this request. The requested node will get
474          * informed by SCIF_NODE_ADD_ACK or SCIF_NODE_ADD_NACK
475          */
476         if (!list_empty(&dev_i->p2p)) {
477                 list_for_each_safe(pos, tmp, &dev_i->p2p) {
478                         p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
479                         if (p2p->ppi_peer_id == dev_j->node)
480                                 return;
481                 }
482         }
483         p2p_ij = scif_init_p2p_info(dev_i, dev_j);
484         if (!p2p_ij)
485                 return;
486         p2p_ji = scif_init_p2p_info(dev_j, dev_i);
487         if (!p2p_ji) {
488                 scif_deinit_p2p_info(dev_i, p2p_ij);
489                 return;
490         }
491         list_add_tail(&p2p_ij->ppi_list, &dev_i->p2p);
492         list_add_tail(&p2p_ji->ppi_list, &dev_j->p2p);
493
494         /*
495          * Send a SCIF_NODE_ADD to dev_i, pass it its bus address
496          * as seen from dev_j
497          */
498         msg.uop = SCIF_NODE_ADD;
499         msg.src.node = dev_j->node;
500         msg.dst.node = dev_i->node;
501
502         msg.payload[0] = p2p_ji->ppi_da[SCIF_PPI_APER];
503         msg.payload[1] = p2p_ij->ppi_da[SCIF_PPI_MMIO];
504         msg.payload[2] = p2p_ij->ppi_da[SCIF_PPI_APER];
505         msg.payload[3] = p2p_ij->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
506
507         err = scif_nodeqp_send(dev_i,  &msg);
508         if (err) {
509                 dev_err(&scifdev->sdev->dev,
510                         "%s %d error %d\n", __func__, __LINE__, err);
511                 return;
512         }
513
514         /* Same as above but to dev_j */
515         msg.uop = SCIF_NODE_ADD;
516         msg.src.node = dev_i->node;
517         msg.dst.node = dev_j->node;
518
519         tmppayload = msg.payload[0];
520         msg.payload[0] = msg.payload[2];
521         msg.payload[2] = tmppayload;
522         msg.payload[1] = p2p_ji->ppi_da[SCIF_PPI_MMIO];
523         msg.payload[3] = p2p_ji->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
524
525         scif_nodeqp_send(dev_j, &msg);
526 }
527
528 static void scif_p2p_setup(void)
529 {
530         int i, j;
531
532         if (!scif_info.p2p_enable)
533                 return;
534
535         for (i = 1; i <= scif_info.maxid; i++)
536                 if (!_scifdev_alive(&scif_dev[i]))
537                         return;
538
539         for (i = 1; i <= scif_info.maxid; i++) {
540                 for (j = 1; j <= scif_info.maxid; j++) {
541                         struct scif_dev *scifdev = &scif_dev[i];
542
543                         if (i == j)
544                                 continue;
545                         scif_node_connect(scifdev, j);
546                 }
547         }
548 }
549
550 static char *message_types[] = {"BAD",
551                                 "INIT",
552                                 "EXIT",
553                                 "SCIF_EXIT_ACK",
554                                 "SCIF_NODE_ADD",
555                                 "SCIF_NODE_ADD_ACK",
556                                 "SCIF_NODE_ADD_NACK",
557                                 "REMOVE_NODE",
558                                 "REMOVE_NODE_ACK",
559                                 "CNCT_REQ",
560                                 "CNCT_GNT",
561                                 "CNCT_GNTACK",
562                                 "CNCT_GNTNACK",
563                                 "CNCT_REJ",
564                                 "DISCNCT",
565                                 "DISCNT_ACK",
566                                 "CLIENT_SENT",
567                                 "CLIENT_RCVD",
568                                 "SCIF_GET_NODE_INFO",
569                                 "REGISTER",
570                                 "REGISTER_ACK",
571                                 "REGISTER_NACK",
572                                 "UNREGISTER",
573                                 "UNREGISTER_ACK",
574                                 "UNREGISTER_NACK",
575                                 "ALLOC_REQ",
576                                 "ALLOC_GNT",
577                                 "ALLOC_REJ",
578                                 "FREE_PHYS",
579                                 "FREE_VIRT",
580                                 "MUNMAP",
581                                 "MARK",
582                                 "MARK_ACK",
583                                 "MARK_NACK",
584                                 "WAIT",
585                                 "WAIT_ACK",
586                                 "WAIT_NACK",
587                                 "SIGNAL_LOCAL",
588                                 "SIGNAL_REMOTE",
589                                 "SIG_ACK",
590                                 "SIG_NACK"};
591
592 static void
593 scif_display_message(struct scif_dev *scifdev, struct scifmsg *msg,
594                      const char *label)
595 {
596         if (!scif_info.en_msg_log)
597                 return;
598         if (msg->uop > SCIF_MAX_MSG) {
599                 dev_err(&scifdev->sdev->dev,
600                         "%s: unknown msg type %d\n", label, msg->uop);
601                 return;
602         }
603         dev_info(&scifdev->sdev->dev,
604                  "%s: msg type %s, src %d:%d, dest %d:%d payload 0x%llx:0x%llx:0x%llx:0x%llx\n",
605                  label, message_types[msg->uop], msg->src.node, msg->src.port,
606                  msg->dst.node, msg->dst.port, msg->payload[0], msg->payload[1],
607                  msg->payload[2], msg->payload[3]);
608 }
609
610 int _scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
611 {
612         struct scif_qp *qp = scifdev->qpairs;
613         int err = -ENOMEM, loop_cnt = 0;
614
615         scif_display_message(scifdev, msg, "Sent");
616         if (!qp) {
617                 err = -EINVAL;
618                 goto error;
619         }
620         spin_lock(&qp->send_lock);
621
622         while ((err = scif_rb_write(&qp->outbound_q,
623                                     msg, sizeof(struct scifmsg)))) {
624                 mdelay(1);
625 #define SCIF_NODEQP_SEND_TO_MSEC (3 * 1000)
626                 if (loop_cnt++ > (SCIF_NODEQP_SEND_TO_MSEC)) {
627                         err = -ENODEV;
628                         break;
629                 }
630         }
631         if (!err)
632                 scif_rb_commit(&qp->outbound_q);
633         spin_unlock(&qp->send_lock);
634         if (!err) {
635                 if (scifdev_self(scifdev))
636                         /*
637                          * For loopback we need to emulate an interrupt by
638                          * queuing work for the queue handling real node
639                          * Qp interrupts.
640                          */
641                         queue_work(scifdev->intr_wq, &scifdev->intr_bh);
642                 else
643                         scif_send_msg_intr(scifdev);
644         }
645 error:
646         if (err)
647                 dev_dbg(&scifdev->sdev->dev,
648                         "%s %d error %d uop %d\n",
649                          __func__, __LINE__, err, msg->uop);
650         return err;
651 }
652
653 /**
654  * scif_nodeqp_send - Send a message on the node queue pair
655  * @scifdev: Scif Device.
656  * @msg: The message to be sent.
657  */
658 int scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
659 {
660         int err;
661         struct device *spdev = NULL;
662
663         if (msg->uop > SCIF_EXIT_ACK) {
664                 /* Don't send messages once the exit flow has begun */
665                 if (OP_IDLE != scifdev->exit)
666                         return -ENODEV;
667                 spdev = scif_get_peer_dev(scifdev);
668                 if (IS_ERR(spdev)) {
669                         err = PTR_ERR(spdev);
670                         return err;
671                 }
672         }
673         err = _scif_nodeqp_send(scifdev, msg);
674         if (msg->uop > SCIF_EXIT_ACK)
675                 scif_put_peer_dev(spdev);
676         return err;
677 }
678
679 /*
680  * scif_misc_handler:
681  *
682  * Work queue handler for servicing miscellaneous SCIF tasks.
683  * Examples include:
684  * 1) Remote fence requests.
685  * 2) Destruction of temporary registered windows
686  *    created during scif_vreadfrom()/scif_vwriteto().
687  * 3) Cleanup of zombie endpoints.
688  */
689 void scif_misc_handler(struct work_struct *work)
690 {
691         scif_rma_handle_remote_fences();
692         scif_rma_destroy_windows();
693         scif_rma_destroy_tcw_invalid();
694         scif_cleanup_zombie_epd();
695 }
696
697 /**
698  * scif_init() - Respond to SCIF_INIT interrupt message
699  * @scifdev:    Remote SCIF device node
700  * @msg:        Interrupt message
701  */
702 static __always_inline void
703 scif_init(struct scif_dev *scifdev, struct scifmsg *msg)
704 {
705         /*
706          * Allow the thread waiting for device page updates for the peer QP DMA
707          * address to complete initializing the inbound_q.
708          */
709         flush_delayed_work(&scifdev->qp_dwork);
710
711         scif_peer_register_device(scifdev);
712
713         if (scif_is_mgmt_node()) {
714                 mutex_lock(&scif_info.conflock);
715                 scif_p2p_setup();
716                 mutex_unlock(&scif_info.conflock);
717         }
718 }
719
720 /**
721  * scif_exit() - Respond to SCIF_EXIT interrupt message
722  * @scifdev:    Remote SCIF device node
723  * @unused:     Interrupt message (unused)
724  *
725  * This function stops the SCIF interface for the node which sent
726  * the SCIF_EXIT message and starts waiting for that node to
727  * resetup the queue pair again.
728  */
729 static __always_inline void
730 scif_exit(struct scif_dev *scifdev, struct scifmsg *unused)
731 {
732         scifdev->exit_ack_pending = true;
733         if (scif_is_mgmt_node())
734                 scif_disconnect_node(scifdev->node, false);
735         else
736                 scif_stop(scifdev);
737         schedule_delayed_work(&scifdev->qp_dwork,
738                               msecs_to_jiffies(1000));
739 }
740
741 /**
742  * scif_exitack() - Respond to SCIF_EXIT_ACK interrupt message
743  * @scifdev:    Remote SCIF device node
744  * @unused:     Interrupt message (unused)
745  *
746  */
747 static __always_inline void
748 scif_exit_ack(struct scif_dev *scifdev, struct scifmsg *unused)
749 {
750         scifdev->exit = OP_COMPLETED;
751         wake_up(&scif_info.exitwq);
752 }
753
754 /**
755  * scif_node_add() - Respond to SCIF_NODE_ADD interrupt message
756  * @scifdev:    Remote SCIF device node
757  * @msg:        Interrupt message
758  *
759  * When the mgmt node driver has finished initializing a MIC node queue pair it
760  * marks the node as online. It then looks for all currently online MIC cards
761  * and send a SCIF_NODE_ADD message to identify the ID of the new card for
762  * peer to peer initialization
763  *
764  * The local node allocates its incoming queue and sends its address in the
765  * SCIF_NODE_ADD_ACK message back to the mgmt node, the mgmt node "reflects"
766  * this message to the new node
767  */
768 static __always_inline void
769 scif_node_add(struct scif_dev *scifdev, struct scifmsg *msg)
770 {
771         struct scif_dev *newdev;
772         dma_addr_t qp_offset;
773         int qp_connect;
774         struct scif_hw_dev *sdev;
775
776         dev_dbg(&scifdev->sdev->dev,
777                 "Scifdev %d:%d received NODE_ADD msg for node %d\n",
778                 scifdev->node, msg->dst.node, msg->src.node);
779         dev_dbg(&scifdev->sdev->dev,
780                 "Remote address for this node's aperture %llx\n",
781                 msg->payload[0]);
782         newdev = &scif_dev[msg->src.node];
783         newdev->node = msg->src.node;
784         newdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
785         sdev = newdev->sdev;
786
787         if (scif_setup_intr_wq(newdev)) {
788                 dev_err(&scifdev->sdev->dev,
789                         "failed to setup interrupts for %d\n", msg->src.node);
790                 goto interrupt_setup_error;
791         }
792         newdev->mmio.va = ioremap(msg->payload[1], sdev->mmio->len);
793         if (!newdev->mmio.va) {
794                 dev_err(&scifdev->sdev->dev,
795                         "failed to map mmio for %d\n", msg->src.node);
796                 goto mmio_map_error;
797         }
798         newdev->qpairs = kzalloc(sizeof(*newdev->qpairs), GFP_KERNEL);
799         if (!newdev->qpairs)
800                 goto qp_alloc_error;
801         /*
802          * Set the base address of the remote node's memory since it gets
803          * added to qp_offset
804          */
805         newdev->base_addr = msg->payload[0];
806
807         qp_connect = scif_setup_qp_connect(newdev->qpairs, &qp_offset,
808                                            SCIF_NODE_QP_SIZE, newdev);
809         if (qp_connect) {
810                 dev_err(&scifdev->sdev->dev,
811                         "failed to setup qp_connect %d\n", qp_connect);
812                 goto qp_connect_error;
813         }
814
815         newdev->db = sdev->hw_ops->next_db(sdev);
816         newdev->cookie = sdev->hw_ops->request_irq(sdev, scif_intr_handler,
817                                                    "SCIF_INTR", newdev,
818                                                    newdev->db);
819         if (IS_ERR(newdev->cookie))
820                 goto qp_connect_error;
821         newdev->qpairs->magic = SCIFEP_MAGIC;
822         newdev->qpairs->qp_state = SCIF_QP_OFFLINE;
823
824         msg->uop = SCIF_NODE_ADD_ACK;
825         msg->dst.node = msg->src.node;
826         msg->src.node = scif_info.nodeid;
827         msg->payload[0] = qp_offset;
828         msg->payload[2] = newdev->db;
829         scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
830         return;
831 qp_connect_error:
832         kfree(newdev->qpairs);
833         newdev->qpairs = NULL;
834 qp_alloc_error:
835         iounmap(newdev->mmio.va);
836         newdev->mmio.va = NULL;
837 mmio_map_error:
838 interrupt_setup_error:
839         dev_err(&scifdev->sdev->dev,
840                 "node add failed for node %d\n", msg->src.node);
841         msg->uop = SCIF_NODE_ADD_NACK;
842         msg->dst.node = msg->src.node;
843         msg->src.node = scif_info.nodeid;
844         scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
845 }
846
847 void scif_poll_qp_state(struct work_struct *work)
848 {
849 #define SCIF_NODE_QP_RETRY 100
850 #define SCIF_NODE_QP_TIMEOUT 100
851         struct scif_dev *peerdev = container_of(work, struct scif_dev,
852                                                         p2p_dwork.work);
853         struct scif_qp *qp = &peerdev->qpairs[0];
854
855         if (qp->qp_state != SCIF_QP_ONLINE ||
856             qp->remote_qp->qp_state != SCIF_QP_ONLINE) {
857                 if (peerdev->p2p_retry++ == SCIF_NODE_QP_RETRY) {
858                         dev_err(&peerdev->sdev->dev,
859                                 "Warning: QP check timeout with state %d\n",
860                                 qp->qp_state);
861                         goto timeout;
862                 }
863                 schedule_delayed_work(&peerdev->p2p_dwork,
864                                       msecs_to_jiffies(SCIF_NODE_QP_TIMEOUT));
865                 return;
866         }
867         return;
868 timeout:
869         dev_err(&peerdev->sdev->dev,
870                 "%s %d remote node %d offline,  state = 0x%x\n",
871                 __func__, __LINE__, peerdev->node, qp->qp_state);
872         qp->remote_qp->qp_state = SCIF_QP_OFFLINE;
873         scif_peer_unregister_device(peerdev);
874         scif_cleanup_scifdev(peerdev);
875 }
876
877 /**
878  * scif_node_add_ack() - Respond to SCIF_NODE_ADD_ACK interrupt message
879  * @scifdev:    Remote SCIF device node
880  * @msg:        Interrupt message
881  *
882  * After a MIC node receives the SCIF_NODE_ADD_ACK message it send this
883  * message to the mgmt node to confirm the sequence is finished.
884  *
885  */
886 static __always_inline void
887 scif_node_add_ack(struct scif_dev *scifdev, struct scifmsg *msg)
888 {
889         struct scif_dev *peerdev;
890         struct scif_qp *qp;
891         struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
892
893         dev_dbg(&scifdev->sdev->dev,
894                 "Scifdev %d received SCIF_NODE_ADD_ACK msg src %d dst %d\n",
895                 scifdev->node, msg->src.node, msg->dst.node);
896         dev_dbg(&scifdev->sdev->dev,
897                 "payload %llx %llx %llx %llx\n", msg->payload[0],
898                 msg->payload[1], msg->payload[2], msg->payload[3]);
899         if (scif_is_mgmt_node()) {
900                 /*
901                  * the lock serializes with scif_qp_response_ack. The mgmt node
902                  * is forwarding the NODE_ADD_ACK message from src to dst we
903                  * need to make sure that the dst has already received a
904                  * NODE_ADD for src and setup its end of the qp to dst
905                  */
906                 mutex_lock(&scif_info.conflock);
907                 msg->payload[1] = scif_info.maxid;
908                 scif_nodeqp_send(dst_dev, msg);
909                 mutex_unlock(&scif_info.conflock);
910                 return;
911         }
912         peerdev = &scif_dev[msg->src.node];
913         peerdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
914         peerdev->node = msg->src.node;
915
916         qp = &peerdev->qpairs[0];
917
918         if ((scif_setup_qp_connect_response(peerdev, &peerdev->qpairs[0],
919                                             msg->payload[0])))
920                 goto local_error;
921         peerdev->rdb = msg->payload[2];
922         qp->remote_qp->qp_state = SCIF_QP_ONLINE;
923
924         scif_peer_register_device(peerdev);
925
926         schedule_delayed_work(&peerdev->p2p_dwork, 0);
927         return;
928 local_error:
929         scif_cleanup_scifdev(peerdev);
930 }
931
932 /**
933  * scif_node_add_nack: Respond to SCIF_NODE_ADD_NACK interrupt message
934  * @scifdev:    Remote SCIF device node
935  * @msg:        Interrupt message
936  *
937  * SCIF_NODE_ADD failed, so inform the waiting wq.
938  */
939 static __always_inline void
940 scif_node_add_nack(struct scif_dev *scifdev, struct scifmsg *msg)
941 {
942         if (scif_is_mgmt_node()) {
943                 struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
944
945                 dev_dbg(&scifdev->sdev->dev,
946                         "SCIF_NODE_ADD_NACK received from %d\n", scifdev->node);
947                 scif_nodeqp_send(dst_dev, msg);
948         }
949 }
950
951 /**
952  * scif_node_remove: Handle SCIF_NODE_REMOVE message
953  * @scifdev:    Remote SCIF device node
954  * @msg: Interrupt message
955  *
956  * Handle node removal.
957  */
958 static __always_inline void
959 scif_node_remove(struct scif_dev *scifdev, struct scifmsg *msg)
960 {
961         int node = msg->payload[0];
962         struct scif_dev *scdev = &scif_dev[node];
963
964         scdev->node_remove_ack_pending = true;
965         scif_handle_remove_node(node);
966 }
967
968 /**
969  * scif_node_remove_ack: Handle SCIF_NODE_REMOVE_ACK message
970  * @scifdev:    Remote SCIF device node
971  * @msg: Interrupt message
972  *
973  * The peer has acked a SCIF_NODE_REMOVE message.
974  */
975 static __always_inline void
976 scif_node_remove_ack(struct scif_dev *scifdev, struct scifmsg *msg)
977 {
978         struct scif_dev *sdev = &scif_dev[msg->payload[0]];
979
980         atomic_inc(&sdev->disconn_rescnt);
981         wake_up(&sdev->disconn_wq);
982 }
983
984 /**
985  * scif_get_node_info: Respond to SCIF_GET_NODE_INFO interrupt message
986  * @scifdev:    Remote SCIF device node
987  * @msg:        Interrupt message
988  *
989  * Retrieve node info i.e maxid and total from the mgmt node.
990  */
991 static __always_inline void
992 scif_get_node_info_resp(struct scif_dev *scifdev, struct scifmsg *msg)
993 {
994         if (scif_is_mgmt_node()) {
995                 swap(msg->dst.node, msg->src.node);
996                 mutex_lock(&scif_info.conflock);
997                 msg->payload[1] = scif_info.maxid;
998                 msg->payload[2] = scif_info.total;
999                 mutex_unlock(&scif_info.conflock);
1000                 scif_nodeqp_send(scifdev, msg);
1001         } else {
1002                 struct completion *node_info =
1003                         (struct completion *)msg->payload[3];
1004
1005                 mutex_lock(&scif_info.conflock);
1006                 scif_info.maxid = msg->payload[1];
1007                 scif_info.total = msg->payload[2];
1008                 complete_all(node_info);
1009                 mutex_unlock(&scif_info.conflock);
1010         }
1011 }
1012
1013 static void
1014 scif_msg_unknown(struct scif_dev *scifdev, struct scifmsg *msg)
1015 {
1016         /* Bogus Node Qp Message? */
1017         dev_err(&scifdev->sdev->dev,
1018                 "Unknown message 0x%xn scifdev->node 0x%x\n",
1019                 msg->uop, scifdev->node);
1020 }
1021
1022 static void (*scif_intr_func[SCIF_MAX_MSG + 1])
1023             (struct scif_dev *, struct scifmsg *msg) = {
1024         scif_msg_unknown,       /* Error */
1025         scif_init,              /* SCIF_INIT */
1026         scif_exit,              /* SCIF_EXIT */
1027         scif_exit_ack,          /* SCIF_EXIT_ACK */
1028         scif_node_add,          /* SCIF_NODE_ADD */
1029         scif_node_add_ack,      /* SCIF_NODE_ADD_ACK */
1030         scif_node_add_nack,     /* SCIF_NODE_ADD_NACK */
1031         scif_node_remove,       /* SCIF_NODE_REMOVE */
1032         scif_node_remove_ack,   /* SCIF_NODE_REMOVE_ACK */
1033         scif_cnctreq,           /* SCIF_CNCT_REQ */
1034         scif_cnctgnt,           /* SCIF_CNCT_GNT */
1035         scif_cnctgnt_ack,       /* SCIF_CNCT_GNTACK */
1036         scif_cnctgnt_nack,      /* SCIF_CNCT_GNTNACK */
1037         scif_cnctrej,           /* SCIF_CNCT_REJ */
1038         scif_discnct,           /* SCIF_DISCNCT */
1039         scif_discnt_ack,        /* SCIF_DISCNT_ACK */
1040         scif_clientsend,        /* SCIF_CLIENT_SENT */
1041         scif_clientrcvd,        /* SCIF_CLIENT_RCVD */
1042         scif_get_node_info_resp,/* SCIF_GET_NODE_INFO */
1043         scif_recv_reg,          /* SCIF_REGISTER */
1044         scif_recv_reg_ack,      /* SCIF_REGISTER_ACK */
1045         scif_recv_reg_nack,     /* SCIF_REGISTER_NACK */
1046         scif_recv_unreg,        /* SCIF_UNREGISTER */
1047         scif_recv_unreg_ack,    /* SCIF_UNREGISTER_ACK */
1048         scif_recv_unreg_nack,   /* SCIF_UNREGISTER_NACK */
1049         scif_alloc_req,         /* SCIF_ALLOC_REQ */
1050         scif_alloc_gnt_rej,     /* SCIF_ALLOC_GNT */
1051         scif_alloc_gnt_rej,     /* SCIF_ALLOC_REJ */
1052         scif_free_virt,         /* SCIF_FREE_VIRT */
1053         scif_recv_munmap,       /* SCIF_MUNMAP */
1054         scif_recv_mark,         /* SCIF_MARK */
1055         scif_recv_mark_resp,    /* SCIF_MARK_ACK */
1056         scif_recv_mark_resp,    /* SCIF_MARK_NACK */
1057         scif_recv_wait,         /* SCIF_WAIT */
1058         scif_recv_wait_resp,    /* SCIF_WAIT_ACK */
1059         scif_recv_wait_resp,    /* SCIF_WAIT_NACK */
1060         scif_recv_sig_local,    /* SCIF_SIG_LOCAL */
1061         scif_recv_sig_remote,   /* SCIF_SIG_REMOTE */
1062         scif_recv_sig_resp,     /* SCIF_SIG_ACK */
1063         scif_recv_sig_resp,     /* SCIF_SIG_NACK */
1064 };
1065
1066 static int scif_max_msg_id = SCIF_MAX_MSG;
1067 /**
1068  * scif_nodeqp_msg_handler() - Common handler for node messages
1069  * @scifdev: Remote device to respond to
1070  * @qp: Remote memory pointer
1071  * @msg: The message to be handled.
1072  *
1073  * This routine calls the appropriate routine to handle a Node Qp
1074  * message receipt
1075  */
1076 static void
1077 scif_nodeqp_msg_handler(struct scif_dev *scifdev,
1078                         struct scif_qp *qp, struct scifmsg *msg)
1079 {
1080         scif_display_message(scifdev, msg, "Rcvd");
1081
1082         if (msg->uop > (u32)scif_max_msg_id) {
1083                 /* Bogus Node Qp Message? */
1084                 dev_err(&scifdev->sdev->dev,
1085                         "Unknown message 0x%xn scifdev->node 0x%x\n",
1086                         msg->uop, scifdev->node);
1087                 return;
1088         }
1089
1090         scif_intr_func[msg->uop](scifdev, msg);
1091 }
1092
1093 /**
1094  * scif_nodeqp_intrhandler() - Interrupt handler for node messages
1095  * @scifdev:    Remote device to respond to
1096  * @qp:         Remote memory pointer
1097  *
1098  * This routine is triggered by the interrupt mechanism.  It reads
1099  * messages from the node queue RB and calls the Node QP Message handling
1100  * routine.
1101  */
1102 void scif_nodeqp_intrhandler(struct scif_dev *scifdev, struct scif_qp *qp)
1103 {
1104         struct scifmsg msg;
1105         int read_size;
1106
1107         do {
1108                 read_size = scif_rb_get_next(&qp->inbound_q, &msg, sizeof(msg));
1109                 if (!read_size)
1110                         break;
1111                 scif_nodeqp_msg_handler(scifdev, qp, &msg);
1112                 /*
1113                  * The node queue pair is unmapped so skip the read pointer
1114                  * update after receipt of a SCIF_EXIT_ACK
1115                  */
1116                 if (SCIF_EXIT_ACK == msg.uop)
1117                         break;
1118                 scif_rb_update_read_ptr(&qp->inbound_q);
1119         } while (1);
1120 }
1121
1122 /**
1123  * scif_loopb_wq_handler - Loopback Workqueue Handler.
1124  * @unused: loop back work (unused)
1125  *
1126  * This work queue routine is invoked by the loopback work queue handler.
1127  * It grabs the recv lock, dequeues any available messages from the head
1128  * of the loopback message list, calls the node QP message handler,
1129  * waits for it to return, then frees up this message and dequeues more
1130  * elements of the list if available.
1131  */
1132 static void scif_loopb_wq_handler(struct work_struct *unused)
1133 {
1134         struct scif_dev *scifdev = scif_info.loopb_dev;
1135         struct scif_qp *qp = scifdev->qpairs;
1136         struct scif_loopb_msg *msg;
1137
1138         do {
1139                 msg = NULL;
1140                 spin_lock(&qp->recv_lock);
1141                 if (!list_empty(&scif_info.loopb_recv_q)) {
1142                         msg = list_first_entry(&scif_info.loopb_recv_q,
1143                                                struct scif_loopb_msg,
1144                                                list);
1145                         list_del(&msg->list);
1146                 }
1147                 spin_unlock(&qp->recv_lock);
1148
1149                 if (msg) {
1150                         scif_nodeqp_msg_handler(scifdev, qp, &msg->msg);
1151                         kfree(msg);
1152                 }
1153         } while (msg);
1154 }
1155
1156 /**
1157  * scif_loopb_msg_handler() - Workqueue handler for loopback messages.
1158  * @scifdev: SCIF device
1159  * @qp: Queue pair.
1160  *
1161  * This work queue routine is triggered when a loopback message is received.
1162  *
1163  * We need special handling for receiving Node Qp messages on a loopback SCIF
1164  * device via two workqueues for receiving messages.
1165  *
1166  * The reason we need the extra workqueue which is not required with *normal*
1167  * non-loopback SCIF devices is the potential classic deadlock described below:
1168  *
1169  * Thread A tries to send a message on a loopback SCIF device and blocks since
1170  * there is no space in the RB while it has the send_lock held or another
1171  * lock called lock X for example.
1172  *
1173  * Thread B: The Loopback Node QP message receive workqueue receives the message
1174  * and tries to send a message (eg an ACK) to the loopback SCIF device. It tries
1175  * to grab the send lock again or lock X and deadlocks with Thread A. The RB
1176  * cannot be drained any further due to this classic deadlock.
1177  *
1178  * In order to avoid deadlocks as mentioned above we have an extra level of
1179  * indirection achieved by having two workqueues.
1180  * 1) The first workqueue whose handler is scif_loopb_msg_handler reads
1181  * messages from the Node QP RB, adds them to a list and queues work for the
1182  * second workqueue.
1183  *
1184  * 2) The second workqueue whose handler is scif_loopb_wq_handler dequeues
1185  * messages from the list, handles them, frees up the memory and dequeues
1186  * more elements from the list if possible.
1187  */
1188 int
1189 scif_loopb_msg_handler(struct scif_dev *scifdev, struct scif_qp *qp)
1190 {
1191         int read_size;
1192         struct scif_loopb_msg *msg;
1193
1194         do {
1195                 msg = kmalloc(sizeof(*msg), GFP_KERNEL);
1196                 if (!msg)
1197                         return -ENOMEM;
1198                 read_size = scif_rb_get_next(&qp->inbound_q, &msg->msg,
1199                                              sizeof(struct scifmsg));
1200                 if (read_size != sizeof(struct scifmsg)) {
1201                         kfree(msg);
1202                         scif_rb_update_read_ptr(&qp->inbound_q);
1203                         break;
1204                 }
1205                 spin_lock(&qp->recv_lock);
1206                 list_add_tail(&msg->list, &scif_info.loopb_recv_q);
1207                 spin_unlock(&qp->recv_lock);
1208                 queue_work(scif_info.loopb_wq, &scif_info.loopb_work);
1209                 scif_rb_update_read_ptr(&qp->inbound_q);
1210         } while (read_size == sizeof(struct scifmsg));
1211         return read_size;
1212 }
1213
1214 /**
1215  * scif_setup_loopback_qp - One time setup work for Loopback Node Qp.
1216  * @scifdev: SCIF device
1217  *
1218  * Sets up the required loopback workqueues, queue pairs and ring buffers
1219  */
1220 int scif_setup_loopback_qp(struct scif_dev *scifdev)
1221 {
1222         int err = 0;
1223         void *local_q;
1224         struct scif_qp *qp;
1225
1226         err = scif_setup_intr_wq(scifdev);
1227         if (err)
1228                 goto exit;
1229         INIT_LIST_HEAD(&scif_info.loopb_recv_q);
1230         snprintf(scif_info.loopb_wqname, sizeof(scif_info.loopb_wqname),
1231                  "SCIF LOOPB %d", scifdev->node);
1232         scif_info.loopb_wq =
1233                 alloc_ordered_workqueue(scif_info.loopb_wqname, 0);
1234         if (!scif_info.loopb_wq) {
1235                 err = -ENOMEM;
1236                 goto destroy_intr;
1237         }
1238         INIT_WORK(&scif_info.loopb_work, scif_loopb_wq_handler);
1239         /* Allocate Self Qpair */
1240         scifdev->qpairs = kzalloc(sizeof(*scifdev->qpairs), GFP_KERNEL);
1241         if (!scifdev->qpairs) {
1242                 err = -ENOMEM;
1243                 goto destroy_loopb_wq;
1244         }
1245
1246         qp = scifdev->qpairs;
1247         qp->magic = SCIFEP_MAGIC;
1248         spin_lock_init(&qp->send_lock);
1249         spin_lock_init(&qp->recv_lock);
1250
1251         local_q = kzalloc(SCIF_NODE_QP_SIZE, GFP_KERNEL);
1252         if (!local_q) {
1253                 err = -ENOMEM;
1254                 goto free_qpairs;
1255         }
1256         /*
1257          * For loopback the inbound_q and outbound_q are essentially the same
1258          * since the Node sends a message on the loopback interface to the
1259          * outbound_q which is then received on the inbound_q.
1260          */
1261         scif_rb_init(&qp->outbound_q,
1262                      &qp->local_read,
1263                      &qp->local_write,
1264                      local_q, get_count_order(SCIF_NODE_QP_SIZE));
1265
1266         scif_rb_init(&qp->inbound_q,
1267                      &qp->local_read,
1268                      &qp->local_write,
1269                      local_q, get_count_order(SCIF_NODE_QP_SIZE));
1270         scif_info.nodeid = scifdev->node;
1271
1272         scif_peer_register_device(scifdev);
1273
1274         scif_info.loopb_dev = scifdev;
1275         return err;
1276 free_qpairs:
1277         kfree(scifdev->qpairs);
1278 destroy_loopb_wq:
1279         destroy_workqueue(scif_info.loopb_wq);
1280 destroy_intr:
1281         scif_destroy_intr_wq(scifdev);
1282 exit:
1283         return err;
1284 }
1285
1286 /**
1287  * scif_destroy_loopback_qp - One time uninit work for Loopback Node Qp
1288  * @scifdev: SCIF device
1289  *
1290  * Destroys the workqueues and frees up the Ring Buffer and Queue Pair memory.
1291  */
1292 int scif_destroy_loopback_qp(struct scif_dev *scifdev)
1293 {
1294         scif_peer_unregister_device(scifdev);
1295         destroy_workqueue(scif_info.loopb_wq);
1296         scif_destroy_intr_wq(scifdev);
1297         kfree(scifdev->qpairs->outbound_q.rb_base);
1298         kfree(scifdev->qpairs);
1299         scifdev->sdev = NULL;
1300         scif_info.loopb_dev = NULL;
1301         return 0;
1302 }
1303
1304 void scif_destroy_p2p(struct scif_dev *scifdev)
1305 {
1306         struct scif_dev *peer_dev;
1307         struct scif_p2p_info *p2p;
1308         struct list_head *pos, *tmp;
1309         int bd;
1310
1311         mutex_lock(&scif_info.conflock);
1312         /* Free P2P mappings in the given node for all its peer nodes */
1313         list_for_each_safe(pos, tmp, &scifdev->p2p) {
1314                 p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1315                 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
1316                              p2p->sg_nentries[SCIF_PPI_MMIO],
1317                              DMA_BIDIRECTIONAL);
1318                 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
1319                              p2p->sg_nentries[SCIF_PPI_APER],
1320                              DMA_BIDIRECTIONAL);
1321                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1322                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1323                 list_del(pos);
1324                 kfree(p2p);
1325         }
1326
1327         /* Free P2P mapping created in the peer nodes for the given node */
1328         for (bd = SCIF_MGMT_NODE + 1; bd <= scif_info.maxid; bd++) {
1329                 peer_dev = &scif_dev[bd];
1330                 list_for_each_safe(pos, tmp, &peer_dev->p2p) {
1331                         p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1332                         if (p2p->ppi_peer_id == scifdev->node) {
1333                                 dma_unmap_sg(&peer_dev->sdev->dev,
1334                                              p2p->ppi_sg[SCIF_PPI_MMIO],
1335                                              p2p->sg_nentries[SCIF_PPI_MMIO],
1336                                              DMA_BIDIRECTIONAL);
1337                                 dma_unmap_sg(&peer_dev->sdev->dev,
1338                                              p2p->ppi_sg[SCIF_PPI_APER],
1339                                              p2p->sg_nentries[SCIF_PPI_APER],
1340                                              DMA_BIDIRECTIONAL);
1341                                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1342                                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1343                                 list_del(pos);
1344                                 kfree(p2p);
1345                         }
1346                 }
1347         }
1348         mutex_unlock(&scif_info.conflock);
1349 }