1 // SPDX-License-Identifier: GPL-2.0-only
3 * Intel MIC Platform Software Stack (MPSS)
5 * Copyright(c) 2014 Intel Corporation.
9 #include "../bus/scif_bus.h"
10 #include "scif_peer_bus.h"
11 #include "scif_main.h"
12 #include "scif_nodeqp.h"
16 ************************************************************************
17 * SCIF node Queue Pair (QP) setup flow:
19 * 1) SCIF driver gets probed with a scif_hw_dev via the scif_hw_bus
20 * 2) scif_setup_qp(..) allocates the local qp and calls
21 * scif_setup_qp_connect(..) which allocates and maps the local
22 * buffer for the inbound QP
23 * 3) The local node updates the device page with the DMA address of the QP
24 * 4) A delayed work is scheduled (qp_dwork) which periodically reads if
25 * the peer node has updated its QP DMA address
26 * 5) Once a valid non zero address is found in the QP DMA address field
27 * in the device page, the local node maps the remote node's QP,
28 * updates its outbound QP and sends a SCIF_INIT message to the peer
29 * 6) The SCIF_INIT message is received by the peer node QP interrupt bottom
30 * half handler by calling scif_init(..)
31 * 7) scif_init(..) registers a new SCIF peer node by calling
32 * scif_peer_register_device(..) which signifies the addition of a new
34 * 8) On the mgmt node, P2P network setup/teardown is initiated if all the
35 * remote nodes are online via scif_p2p_setup(..)
36 * 9) For P2P setup, the host maps the remote nodes' aperture and memory
37 * bars and sends a SCIF_NODE_ADD message to both nodes
38 * 10) As part of scif_nodeadd, both nodes set up their local inbound
39 * QPs and send a SCIF_NODE_ADD_ACK to the mgmt node
40 * 11) As part of scif_node_add_ack(..) the mgmt node forwards the
41 * SCIF_NODE_ADD_ACK to the remote nodes
42 * 12) As part of scif_node_add_ack(..) the remote nodes update their
43 * outbound QPs, make sure they can access memory on the remote node
44 * and then add a new SCIF peer node by calling
45 * scif_peer_register_device(..) which signifies the addition of a new
47 * 13) The SCIF network is now established across all nodes.
49 ************************************************************************
50 * SCIF node QP teardown flow (initiated by non mgmt node):
52 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
53 * 2) The device page QP DMA address field is updated with 0x0
54 * 3) A non mgmt node now cleans up all local data structures and sends a
55 * SCIF_EXIT message to the peer and waits for a SCIF_EXIT_ACK
56 * 4) As part of scif_exit(..) handling scif_disconnect_node(..) is called
57 * 5) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the
58 * peers and waits for a SCIF_NODE_REMOVE_ACK
59 * 6) As part of scif_node_remove(..) a remote node unregisters the peer
60 * node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
61 * 7) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
62 * it sends itself a node remove message whose handling cleans up local
63 * data structures and unregisters the peer node from the SCIF network
64 * 8) The mgmt node sends a SCIF_EXIT_ACK
65 * 9) Upon receipt of the SCIF_EXIT_ACK the node initiating the teardown
66 * completes the SCIF remove routine
67 * 10) The SCIF network is now torn down for the node initiating the
70 ************************************************************************
71 * SCIF node QP teardown flow (initiated by mgmt node):
73 * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
74 * 2) The device page QP DMA address field is updated with 0x0
75 * 3) The mgmt node calls scif_disconnect_node(..)
76 * 4) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the peers
77 * and waits for a SCIF_NODE_REMOVE_ACK
78 * 5) As part of scif_node_remove(..) a remote node unregisters the peer
79 * node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
80 * 6) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
81 * it unregisters the peer node from the SCIF network
82 * 7) The mgmt node sends a SCIF_EXIT message and waits for a SCIF_EXIT_ACK.
83 * 8) A non mgmt node upon receipt of a SCIF_EXIT message calls scif_stop(..)
84 * which would clean up local data structures for all SCIF nodes and
85 * then send a SCIF_EXIT_ACK back to the mgmt node
86 * 9) Upon receipt of the SCIF_EXIT_ACK the the mgmt node sends itself a node
87 * remove message whose handling cleans up local data structures and
88 * destroys any P2P mappings.
89 * 10) The SCIF hardware device for which a remove callback was received is now
90 * disconnected from the SCIF network.
93 * Initializes "local" data structures for the QP. Allocates the QP
94 * ring buffer (rb) and initializes the "in bound" queue.
96 int scif_setup_qp_connect(struct scif_qp *qp, dma_addr_t *qp_offset,
97 int local_size, struct scif_dev *scifdev)
99 void *local_q = qp->inbound_q.rb_base;
103 spin_lock_init(&qp->send_lock);
104 spin_lock_init(&qp->recv_lock);
106 /* Allocate rb only if not already allocated */
108 local_q = kzalloc(local_size, GFP_KERNEL);
115 err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
119 * To setup the inbound_q, the buffer lives locally, the read pointer
120 * is remote and the write pointer is local.
122 scif_rb_init(&qp->inbound_q,
125 local_q, get_count_order(local_size));
127 * The read pointer is NULL initially and it is unsafe to use the ring
128 * buffer til this changes!
130 qp->inbound_q.read_ptr = NULL;
131 err = scif_map_single(qp_offset, qp,
132 scifdev, sizeof(struct scif_qp));
135 qp->local_qp = *qp_offset;
138 scif_unmap_single(qp->local_buf, scifdev, local_size);
145 /* When the other side has already done it's allocation, this is called */
146 int scif_setup_qp_accept(struct scif_qp *qp, dma_addr_t *qp_offset,
147 dma_addr_t phys, int local_size,
148 struct scif_dev *scifdev)
152 struct scif_qp *remote_qp;
156 spin_lock_init(&qp->send_lock);
157 spin_lock_init(&qp->recv_lock);
158 /* Start by figuring out where we need to point */
159 remote_qp = scif_ioremap(phys, sizeof(struct scif_qp), scifdev);
162 qp->remote_qp = remote_qp;
163 if (qp->remote_qp->magic != SCIFEP_MAGIC) {
167 qp->remote_buf = remote_qp->local_buf;
168 remote_size = qp->remote_qp->inbound_q.size;
169 remote_q = scif_ioremap(qp->remote_buf, remote_size, scifdev);
174 qp->remote_qp->local_write = 0;
176 * To setup the outbound_q, the buffer lives in remote memory,
177 * the read pointer is local, the write pointer is remote
179 scif_rb_init(&qp->outbound_q,
181 &qp->remote_qp->local_write,
183 get_count_order(remote_size));
184 local_q = kzalloc(local_size, GFP_KERNEL);
189 err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
192 qp->remote_qp->local_read = 0;
194 * To setup the inbound_q, the buffer lives locally, the read pointer
195 * is remote and the write pointer is local
197 scif_rb_init(&qp->inbound_q,
198 &qp->remote_qp->local_read,
200 local_q, get_count_order(local_size));
201 err = scif_map_single(qp_offset, qp, scifdev,
202 sizeof(struct scif_qp));
205 qp->local_qp = *qp_offset;
208 scif_unmap_single(qp->local_buf, scifdev, local_size);
213 scif_iounmap(remote_q, remote_size, scifdev);
214 qp->outbound_q.rb_base = NULL;
216 scif_iounmap(qp->remote_qp, sizeof(struct scif_qp), scifdev);
217 qp->remote_qp = NULL;
221 int scif_setup_qp_connect_response(struct scif_dev *scifdev,
222 struct scif_qp *qp, u64 payload)
227 phys_addr_t tmp_phys;
229 qp->remote_qp = scif_ioremap(payload, sizeof(struct scif_qp), scifdev);
231 if (!qp->remote_qp) {
236 if (qp->remote_qp->magic != SCIFEP_MAGIC) {
237 dev_err(&scifdev->sdev->dev,
238 "SCIFEP_MAGIC mismatch between self %d remote %d\n",
239 scif_dev[scif_info.nodeid].node, scifdev->node);
244 tmp_phys = qp->remote_qp->local_buf;
245 remote_size = qp->remote_qp->inbound_q.size;
246 r_buf = scif_ioremap(tmp_phys, remote_size, scifdev);
252 scif_rb_init(&qp->outbound_q,
254 &qp->remote_qp->local_write,
256 get_count_order(remote_size));
258 * Because the node QP may already be processing an INIT message, set
259 * the read pointer so the cached read offset isn't lost
261 qp->remote_qp->local_read = qp->inbound_q.current_read_offset;
263 * resetup the inbound_q now that we know where the
264 * inbound_read really is.
266 scif_rb_init(&qp->inbound_q,
267 &qp->remote_qp->local_read,
269 qp->inbound_q.rb_base,
270 get_count_order(qp->inbound_q.size));
275 static __always_inline void
276 scif_send_msg_intr(struct scif_dev *scifdev)
278 struct scif_hw_dev *sdev = scifdev->sdev;
280 if (scifdev_is_p2p(scifdev))
281 sdev->hw_ops->send_p2p_intr(sdev, scifdev->rdb, &scifdev->mmio);
283 sdev->hw_ops->send_intr(sdev, scifdev->rdb);
286 int scif_qp_response(phys_addr_t phys, struct scif_dev *scifdev)
291 err = scif_setup_qp_connect_response(scifdev, scifdev->qpairs, phys);
294 * Now that everything is setup and mapped, we're ready
295 * to tell the peer about our queue's location
298 msg.dst.node = scifdev->node;
299 err = scif_nodeqp_send(scifdev, &msg);
304 void scif_send_exit(struct scif_dev *scifdev)
309 scifdev->exit = OP_IN_PROGRESS;
311 msg.src.node = scif_info.nodeid;
312 msg.dst.node = scifdev->node;
313 ret = scif_nodeqp_send(scifdev, &msg);
316 /* Wait for a SCIF_EXIT_ACK message */
317 wait_event_timeout(scif_info.exitwq, scifdev->exit == OP_COMPLETED,
318 SCIF_NODE_ALIVE_TIMEOUT);
320 scifdev->exit = OP_IDLE;
323 int scif_setup_qp(struct scif_dev *scifdev)
329 local_size = SCIF_NODE_QP_SIZE;
331 qp = kzalloc(sizeof(*qp), GFP_KERNEL);
336 qp->magic = SCIFEP_MAGIC;
337 scifdev->qpairs = qp;
338 err = scif_setup_qp_connect(qp, &scifdev->qp_dma_addr,
339 local_size, scifdev);
343 * We're as setup as we can be. The inbound_q is setup, w/o a usable
344 * outbound q. When we get a message, the read_ptr will be updated,
345 * and we will pull the message.
349 kfree(scifdev->qpairs);
350 scifdev->qpairs = NULL;
354 static void scif_p2p_freesg(struct scatterlist *sg)
359 static struct scatterlist *
360 scif_p2p_setsg(phys_addr_t pa, int page_size, int page_cnt)
362 struct scatterlist *sg;
366 sg = kcalloc(page_cnt, sizeof(struct scatterlist), GFP_KERNEL);
369 sg_init_table(sg, page_cnt);
370 for (i = 0; i < page_cnt; i++) {
371 page = pfn_to_page(pa >> PAGE_SHIFT);
372 sg_set_page(&sg[i], page, page_size, 0);
378 /* Init p2p mappings required to access peerdev from scifdev */
379 static struct scif_p2p_info *
380 scif_init_p2p_info(struct scif_dev *scifdev, struct scif_dev *peerdev)
382 struct scif_p2p_info *p2p;
383 int num_mmio_pages, num_aper_pages, sg_page_shift, err, num_aper_chunks;
384 struct scif_hw_dev *psdev = peerdev->sdev;
385 struct scif_hw_dev *sdev = scifdev->sdev;
387 num_mmio_pages = psdev->mmio->len >> PAGE_SHIFT;
388 num_aper_pages = psdev->aper->len >> PAGE_SHIFT;
390 p2p = kzalloc(sizeof(*p2p), GFP_KERNEL);
393 p2p->ppi_sg[SCIF_PPI_MMIO] = scif_p2p_setsg(psdev->mmio->pa,
394 PAGE_SIZE, num_mmio_pages);
395 if (!p2p->ppi_sg[SCIF_PPI_MMIO])
397 p2p->sg_nentries[SCIF_PPI_MMIO] = num_mmio_pages;
398 sg_page_shift = get_order(min(psdev->aper->len, (u64)(1 << 30)));
399 num_aper_chunks = num_aper_pages >> (sg_page_shift - PAGE_SHIFT);
400 p2p->ppi_sg[SCIF_PPI_APER] = scif_p2p_setsg(psdev->aper->pa,
403 p2p->sg_nentries[SCIF_PPI_APER] = num_aper_chunks;
404 err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
405 num_mmio_pages, PCI_DMA_BIDIRECTIONAL);
406 if (err != num_mmio_pages)
408 err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
409 num_aper_chunks, PCI_DMA_BIDIRECTIONAL);
410 if (err != num_aper_chunks)
412 p2p->ppi_da[SCIF_PPI_MMIO] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_MMIO]);
413 p2p->ppi_da[SCIF_PPI_APER] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_APER]);
414 p2p->ppi_len[SCIF_PPI_MMIO] = num_mmio_pages;
415 p2p->ppi_len[SCIF_PPI_APER] = num_aper_pages;
416 p2p->ppi_peer_id = peerdev->node;
419 dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
420 p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
422 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
423 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
429 /* Uninitialize and release resources from a p2p mapping */
430 static void scif_deinit_p2p_info(struct scif_dev *scifdev,
431 struct scif_p2p_info *p2p)
433 struct scif_hw_dev *sdev = scifdev->sdev;
435 dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
436 p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
437 dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
438 p2p->sg_nentries[SCIF_PPI_APER], DMA_BIDIRECTIONAL);
439 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
440 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
445 * scif_node_connect: Respond to SCIF_NODE_CONNECT interrupt message
446 * @scifdev: SCIF device
447 * @dst: Destination node
449 * Connect the src and dst node by setting up the p2p connection
450 * between them. Management node here acts like a proxy.
452 static void scif_node_connect(struct scif_dev *scifdev, int dst)
454 struct scif_dev *dev_j = scifdev;
455 struct scif_dev *dev_i = NULL;
456 struct scif_p2p_info *p2p_ij = NULL; /* bus addr for j from i */
457 struct scif_p2p_info *p2p_ji = NULL; /* bus addr for i from j */
458 struct scif_p2p_info *p2p;
459 struct list_head *pos, *tmp;
464 if (dst < 1 || dst > scif_info.maxid)
467 dev_i = &scif_dev[dst];
469 if (!_scifdev_alive(dev_i))
472 * If the p2p connection is already setup or in the process of setting
473 * up then just ignore this request. The requested node will get
474 * informed by SCIF_NODE_ADD_ACK or SCIF_NODE_ADD_NACK
476 if (!list_empty(&dev_i->p2p)) {
477 list_for_each_safe(pos, tmp, &dev_i->p2p) {
478 p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
479 if (p2p->ppi_peer_id == dev_j->node)
483 p2p_ij = scif_init_p2p_info(dev_i, dev_j);
486 p2p_ji = scif_init_p2p_info(dev_j, dev_i);
488 scif_deinit_p2p_info(dev_i, p2p_ij);
491 list_add_tail(&p2p_ij->ppi_list, &dev_i->p2p);
492 list_add_tail(&p2p_ji->ppi_list, &dev_j->p2p);
495 * Send a SCIF_NODE_ADD to dev_i, pass it its bus address
498 msg.uop = SCIF_NODE_ADD;
499 msg.src.node = dev_j->node;
500 msg.dst.node = dev_i->node;
502 msg.payload[0] = p2p_ji->ppi_da[SCIF_PPI_APER];
503 msg.payload[1] = p2p_ij->ppi_da[SCIF_PPI_MMIO];
504 msg.payload[2] = p2p_ij->ppi_da[SCIF_PPI_APER];
505 msg.payload[3] = p2p_ij->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
507 err = scif_nodeqp_send(dev_i, &msg);
509 dev_err(&scifdev->sdev->dev,
510 "%s %d error %d\n", __func__, __LINE__, err);
514 /* Same as above but to dev_j */
515 msg.uop = SCIF_NODE_ADD;
516 msg.src.node = dev_i->node;
517 msg.dst.node = dev_j->node;
519 tmppayload = msg.payload[0];
520 msg.payload[0] = msg.payload[2];
521 msg.payload[2] = tmppayload;
522 msg.payload[1] = p2p_ji->ppi_da[SCIF_PPI_MMIO];
523 msg.payload[3] = p2p_ji->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
525 scif_nodeqp_send(dev_j, &msg);
528 static void scif_p2p_setup(void)
532 if (!scif_info.p2p_enable)
535 for (i = 1; i <= scif_info.maxid; i++)
536 if (!_scifdev_alive(&scif_dev[i]))
539 for (i = 1; i <= scif_info.maxid; i++) {
540 for (j = 1; j <= scif_info.maxid; j++) {
541 struct scif_dev *scifdev = &scif_dev[i];
545 scif_node_connect(scifdev, j);
550 static char *message_types[] = {"BAD",
556 "SCIF_NODE_ADD_NACK",
568 "SCIF_GET_NODE_INFO",
593 scif_display_message(struct scif_dev *scifdev, struct scifmsg *msg,
596 if (!scif_info.en_msg_log)
598 if (msg->uop > SCIF_MAX_MSG) {
599 dev_err(&scifdev->sdev->dev,
600 "%s: unknown msg type %d\n", label, msg->uop);
603 dev_info(&scifdev->sdev->dev,
604 "%s: msg type %s, src %d:%d, dest %d:%d payload 0x%llx:0x%llx:0x%llx:0x%llx\n",
605 label, message_types[msg->uop], msg->src.node, msg->src.port,
606 msg->dst.node, msg->dst.port, msg->payload[0], msg->payload[1],
607 msg->payload[2], msg->payload[3]);
610 int _scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
612 struct scif_qp *qp = scifdev->qpairs;
613 int err = -ENOMEM, loop_cnt = 0;
615 scif_display_message(scifdev, msg, "Sent");
620 spin_lock(&qp->send_lock);
622 while ((err = scif_rb_write(&qp->outbound_q,
623 msg, sizeof(struct scifmsg)))) {
625 #define SCIF_NODEQP_SEND_TO_MSEC (3 * 1000)
626 if (loop_cnt++ > (SCIF_NODEQP_SEND_TO_MSEC)) {
632 scif_rb_commit(&qp->outbound_q);
633 spin_unlock(&qp->send_lock);
635 if (scifdev_self(scifdev))
637 * For loopback we need to emulate an interrupt by
638 * queuing work for the queue handling real node
641 queue_work(scifdev->intr_wq, &scifdev->intr_bh);
643 scif_send_msg_intr(scifdev);
647 dev_dbg(&scifdev->sdev->dev,
648 "%s %d error %d uop %d\n",
649 __func__, __LINE__, err, msg->uop);
654 * scif_nodeqp_send - Send a message on the node queue pair
655 * @scifdev: Scif Device.
656 * @msg: The message to be sent.
658 int scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
661 struct device *spdev = NULL;
663 if (msg->uop > SCIF_EXIT_ACK) {
664 /* Don't send messages once the exit flow has begun */
665 if (OP_IDLE != scifdev->exit)
667 spdev = scif_get_peer_dev(scifdev);
669 err = PTR_ERR(spdev);
673 err = _scif_nodeqp_send(scifdev, msg);
674 if (msg->uop > SCIF_EXIT_ACK)
675 scif_put_peer_dev(spdev);
682 * Work queue handler for servicing miscellaneous SCIF tasks.
684 * 1) Remote fence requests.
685 * 2) Destruction of temporary registered windows
686 * created during scif_vreadfrom()/scif_vwriteto().
687 * 3) Cleanup of zombie endpoints.
689 void scif_misc_handler(struct work_struct *work)
691 scif_rma_handle_remote_fences();
692 scif_rma_destroy_windows();
693 scif_rma_destroy_tcw_invalid();
694 scif_cleanup_zombie_epd();
698 * scif_init() - Respond to SCIF_INIT interrupt message
699 * @scifdev: Remote SCIF device node
700 * @msg: Interrupt message
702 static __always_inline void
703 scif_init(struct scif_dev *scifdev, struct scifmsg *msg)
706 * Allow the thread waiting for device page updates for the peer QP DMA
707 * address to complete initializing the inbound_q.
709 flush_delayed_work(&scifdev->qp_dwork);
711 scif_peer_register_device(scifdev);
713 if (scif_is_mgmt_node()) {
714 mutex_lock(&scif_info.conflock);
716 mutex_unlock(&scif_info.conflock);
721 * scif_exit() - Respond to SCIF_EXIT interrupt message
722 * @scifdev: Remote SCIF device node
723 * @unused: Interrupt message (unused)
725 * This function stops the SCIF interface for the node which sent
726 * the SCIF_EXIT message and starts waiting for that node to
727 * resetup the queue pair again.
729 static __always_inline void
730 scif_exit(struct scif_dev *scifdev, struct scifmsg *unused)
732 scifdev->exit_ack_pending = true;
733 if (scif_is_mgmt_node())
734 scif_disconnect_node(scifdev->node, false);
737 schedule_delayed_work(&scifdev->qp_dwork,
738 msecs_to_jiffies(1000));
742 * scif_exitack() - Respond to SCIF_EXIT_ACK interrupt message
743 * @scifdev: Remote SCIF device node
744 * @unused: Interrupt message (unused)
747 static __always_inline void
748 scif_exit_ack(struct scif_dev *scifdev, struct scifmsg *unused)
750 scifdev->exit = OP_COMPLETED;
751 wake_up(&scif_info.exitwq);
755 * scif_node_add() - Respond to SCIF_NODE_ADD interrupt message
756 * @scifdev: Remote SCIF device node
757 * @msg: Interrupt message
759 * When the mgmt node driver has finished initializing a MIC node queue pair it
760 * marks the node as online. It then looks for all currently online MIC cards
761 * and send a SCIF_NODE_ADD message to identify the ID of the new card for
762 * peer to peer initialization
764 * The local node allocates its incoming queue and sends its address in the
765 * SCIF_NODE_ADD_ACK message back to the mgmt node, the mgmt node "reflects"
766 * this message to the new node
768 static __always_inline void
769 scif_node_add(struct scif_dev *scifdev, struct scifmsg *msg)
771 struct scif_dev *newdev;
772 dma_addr_t qp_offset;
774 struct scif_hw_dev *sdev;
776 dev_dbg(&scifdev->sdev->dev,
777 "Scifdev %d:%d received NODE_ADD msg for node %d\n",
778 scifdev->node, msg->dst.node, msg->src.node);
779 dev_dbg(&scifdev->sdev->dev,
780 "Remote address for this node's aperture %llx\n",
782 newdev = &scif_dev[msg->src.node];
783 newdev->node = msg->src.node;
784 newdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
787 if (scif_setup_intr_wq(newdev)) {
788 dev_err(&scifdev->sdev->dev,
789 "failed to setup interrupts for %d\n", msg->src.node);
790 goto interrupt_setup_error;
792 newdev->mmio.va = ioremap(msg->payload[1], sdev->mmio->len);
793 if (!newdev->mmio.va) {
794 dev_err(&scifdev->sdev->dev,
795 "failed to map mmio for %d\n", msg->src.node);
798 newdev->qpairs = kzalloc(sizeof(*newdev->qpairs), GFP_KERNEL);
802 * Set the base address of the remote node's memory since it gets
805 newdev->base_addr = msg->payload[0];
807 qp_connect = scif_setup_qp_connect(newdev->qpairs, &qp_offset,
808 SCIF_NODE_QP_SIZE, newdev);
810 dev_err(&scifdev->sdev->dev,
811 "failed to setup qp_connect %d\n", qp_connect);
812 goto qp_connect_error;
815 newdev->db = sdev->hw_ops->next_db(sdev);
816 newdev->cookie = sdev->hw_ops->request_irq(sdev, scif_intr_handler,
819 if (IS_ERR(newdev->cookie))
820 goto qp_connect_error;
821 newdev->qpairs->magic = SCIFEP_MAGIC;
822 newdev->qpairs->qp_state = SCIF_QP_OFFLINE;
824 msg->uop = SCIF_NODE_ADD_ACK;
825 msg->dst.node = msg->src.node;
826 msg->src.node = scif_info.nodeid;
827 msg->payload[0] = qp_offset;
828 msg->payload[2] = newdev->db;
829 scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
832 kfree(newdev->qpairs);
833 newdev->qpairs = NULL;
835 iounmap(newdev->mmio.va);
836 newdev->mmio.va = NULL;
838 interrupt_setup_error:
839 dev_err(&scifdev->sdev->dev,
840 "node add failed for node %d\n", msg->src.node);
841 msg->uop = SCIF_NODE_ADD_NACK;
842 msg->dst.node = msg->src.node;
843 msg->src.node = scif_info.nodeid;
844 scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
847 void scif_poll_qp_state(struct work_struct *work)
849 #define SCIF_NODE_QP_RETRY 100
850 #define SCIF_NODE_QP_TIMEOUT 100
851 struct scif_dev *peerdev = container_of(work, struct scif_dev,
853 struct scif_qp *qp = &peerdev->qpairs[0];
855 if (qp->qp_state != SCIF_QP_ONLINE ||
856 qp->remote_qp->qp_state != SCIF_QP_ONLINE) {
857 if (peerdev->p2p_retry++ == SCIF_NODE_QP_RETRY) {
858 dev_err(&peerdev->sdev->dev,
859 "Warning: QP check timeout with state %d\n",
863 schedule_delayed_work(&peerdev->p2p_dwork,
864 msecs_to_jiffies(SCIF_NODE_QP_TIMEOUT));
869 dev_err(&peerdev->sdev->dev,
870 "%s %d remote node %d offline, state = 0x%x\n",
871 __func__, __LINE__, peerdev->node, qp->qp_state);
872 qp->remote_qp->qp_state = SCIF_QP_OFFLINE;
873 scif_peer_unregister_device(peerdev);
874 scif_cleanup_scifdev(peerdev);
878 * scif_node_add_ack() - Respond to SCIF_NODE_ADD_ACK interrupt message
879 * @scifdev: Remote SCIF device node
880 * @msg: Interrupt message
882 * After a MIC node receives the SCIF_NODE_ADD_ACK message it send this
883 * message to the mgmt node to confirm the sequence is finished.
886 static __always_inline void
887 scif_node_add_ack(struct scif_dev *scifdev, struct scifmsg *msg)
889 struct scif_dev *peerdev;
891 struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
893 dev_dbg(&scifdev->sdev->dev,
894 "Scifdev %d received SCIF_NODE_ADD_ACK msg src %d dst %d\n",
895 scifdev->node, msg->src.node, msg->dst.node);
896 dev_dbg(&scifdev->sdev->dev,
897 "payload %llx %llx %llx %llx\n", msg->payload[0],
898 msg->payload[1], msg->payload[2], msg->payload[3]);
899 if (scif_is_mgmt_node()) {
901 * the lock serializes with scif_qp_response_ack. The mgmt node
902 * is forwarding the NODE_ADD_ACK message from src to dst we
903 * need to make sure that the dst has already received a
904 * NODE_ADD for src and setup its end of the qp to dst
906 mutex_lock(&scif_info.conflock);
907 msg->payload[1] = scif_info.maxid;
908 scif_nodeqp_send(dst_dev, msg);
909 mutex_unlock(&scif_info.conflock);
912 peerdev = &scif_dev[msg->src.node];
913 peerdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
914 peerdev->node = msg->src.node;
916 qp = &peerdev->qpairs[0];
918 if ((scif_setup_qp_connect_response(peerdev, &peerdev->qpairs[0],
921 peerdev->rdb = msg->payload[2];
922 qp->remote_qp->qp_state = SCIF_QP_ONLINE;
924 scif_peer_register_device(peerdev);
926 schedule_delayed_work(&peerdev->p2p_dwork, 0);
929 scif_cleanup_scifdev(peerdev);
933 * scif_node_add_nack: Respond to SCIF_NODE_ADD_NACK interrupt message
934 * @scifdev: Remote SCIF device node
935 * @msg: Interrupt message
937 * SCIF_NODE_ADD failed, so inform the waiting wq.
939 static __always_inline void
940 scif_node_add_nack(struct scif_dev *scifdev, struct scifmsg *msg)
942 if (scif_is_mgmt_node()) {
943 struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
945 dev_dbg(&scifdev->sdev->dev,
946 "SCIF_NODE_ADD_NACK received from %d\n", scifdev->node);
947 scif_nodeqp_send(dst_dev, msg);
952 * scif_node_remove: Handle SCIF_NODE_REMOVE message
953 * @scifdev: Remote SCIF device node
954 * @msg: Interrupt message
956 * Handle node removal.
958 static __always_inline void
959 scif_node_remove(struct scif_dev *scifdev, struct scifmsg *msg)
961 int node = msg->payload[0];
962 struct scif_dev *scdev = &scif_dev[node];
964 scdev->node_remove_ack_pending = true;
965 scif_handle_remove_node(node);
969 * scif_node_remove_ack: Handle SCIF_NODE_REMOVE_ACK message
970 * @scifdev: Remote SCIF device node
971 * @msg: Interrupt message
973 * The peer has acked a SCIF_NODE_REMOVE message.
975 static __always_inline void
976 scif_node_remove_ack(struct scif_dev *scifdev, struct scifmsg *msg)
978 struct scif_dev *sdev = &scif_dev[msg->payload[0]];
980 atomic_inc(&sdev->disconn_rescnt);
981 wake_up(&sdev->disconn_wq);
985 * scif_get_node_info: Respond to SCIF_GET_NODE_INFO interrupt message
986 * @scifdev: Remote SCIF device node
987 * @msg: Interrupt message
989 * Retrieve node info i.e maxid and total from the mgmt node.
991 static __always_inline void
992 scif_get_node_info_resp(struct scif_dev *scifdev, struct scifmsg *msg)
994 if (scif_is_mgmt_node()) {
995 swap(msg->dst.node, msg->src.node);
996 mutex_lock(&scif_info.conflock);
997 msg->payload[1] = scif_info.maxid;
998 msg->payload[2] = scif_info.total;
999 mutex_unlock(&scif_info.conflock);
1000 scif_nodeqp_send(scifdev, msg);
1002 struct completion *node_info =
1003 (struct completion *)msg->payload[3];
1005 mutex_lock(&scif_info.conflock);
1006 scif_info.maxid = msg->payload[1];
1007 scif_info.total = msg->payload[2];
1008 complete_all(node_info);
1009 mutex_unlock(&scif_info.conflock);
1014 scif_msg_unknown(struct scif_dev *scifdev, struct scifmsg *msg)
1016 /* Bogus Node Qp Message? */
1017 dev_err(&scifdev->sdev->dev,
1018 "Unknown message 0x%xn scifdev->node 0x%x\n",
1019 msg->uop, scifdev->node);
1022 static void (*scif_intr_func[SCIF_MAX_MSG + 1])
1023 (struct scif_dev *, struct scifmsg *msg) = {
1024 scif_msg_unknown, /* Error */
1025 scif_init, /* SCIF_INIT */
1026 scif_exit, /* SCIF_EXIT */
1027 scif_exit_ack, /* SCIF_EXIT_ACK */
1028 scif_node_add, /* SCIF_NODE_ADD */
1029 scif_node_add_ack, /* SCIF_NODE_ADD_ACK */
1030 scif_node_add_nack, /* SCIF_NODE_ADD_NACK */
1031 scif_node_remove, /* SCIF_NODE_REMOVE */
1032 scif_node_remove_ack, /* SCIF_NODE_REMOVE_ACK */
1033 scif_cnctreq, /* SCIF_CNCT_REQ */
1034 scif_cnctgnt, /* SCIF_CNCT_GNT */
1035 scif_cnctgnt_ack, /* SCIF_CNCT_GNTACK */
1036 scif_cnctgnt_nack, /* SCIF_CNCT_GNTNACK */
1037 scif_cnctrej, /* SCIF_CNCT_REJ */
1038 scif_discnct, /* SCIF_DISCNCT */
1039 scif_discnt_ack, /* SCIF_DISCNT_ACK */
1040 scif_clientsend, /* SCIF_CLIENT_SENT */
1041 scif_clientrcvd, /* SCIF_CLIENT_RCVD */
1042 scif_get_node_info_resp,/* SCIF_GET_NODE_INFO */
1043 scif_recv_reg, /* SCIF_REGISTER */
1044 scif_recv_reg_ack, /* SCIF_REGISTER_ACK */
1045 scif_recv_reg_nack, /* SCIF_REGISTER_NACK */
1046 scif_recv_unreg, /* SCIF_UNREGISTER */
1047 scif_recv_unreg_ack, /* SCIF_UNREGISTER_ACK */
1048 scif_recv_unreg_nack, /* SCIF_UNREGISTER_NACK */
1049 scif_alloc_req, /* SCIF_ALLOC_REQ */
1050 scif_alloc_gnt_rej, /* SCIF_ALLOC_GNT */
1051 scif_alloc_gnt_rej, /* SCIF_ALLOC_REJ */
1052 scif_free_virt, /* SCIF_FREE_VIRT */
1053 scif_recv_munmap, /* SCIF_MUNMAP */
1054 scif_recv_mark, /* SCIF_MARK */
1055 scif_recv_mark_resp, /* SCIF_MARK_ACK */
1056 scif_recv_mark_resp, /* SCIF_MARK_NACK */
1057 scif_recv_wait, /* SCIF_WAIT */
1058 scif_recv_wait_resp, /* SCIF_WAIT_ACK */
1059 scif_recv_wait_resp, /* SCIF_WAIT_NACK */
1060 scif_recv_sig_local, /* SCIF_SIG_LOCAL */
1061 scif_recv_sig_remote, /* SCIF_SIG_REMOTE */
1062 scif_recv_sig_resp, /* SCIF_SIG_ACK */
1063 scif_recv_sig_resp, /* SCIF_SIG_NACK */
1066 static int scif_max_msg_id = SCIF_MAX_MSG;
1068 * scif_nodeqp_msg_handler() - Common handler for node messages
1069 * @scifdev: Remote device to respond to
1070 * @qp: Remote memory pointer
1071 * @msg: The message to be handled.
1073 * This routine calls the appropriate routine to handle a Node Qp
1077 scif_nodeqp_msg_handler(struct scif_dev *scifdev,
1078 struct scif_qp *qp, struct scifmsg *msg)
1080 scif_display_message(scifdev, msg, "Rcvd");
1082 if (msg->uop > (u32)scif_max_msg_id) {
1083 /* Bogus Node Qp Message? */
1084 dev_err(&scifdev->sdev->dev,
1085 "Unknown message 0x%xn scifdev->node 0x%x\n",
1086 msg->uop, scifdev->node);
1090 scif_intr_func[msg->uop](scifdev, msg);
1094 * scif_nodeqp_intrhandler() - Interrupt handler for node messages
1095 * @scifdev: Remote device to respond to
1096 * @qp: Remote memory pointer
1098 * This routine is triggered by the interrupt mechanism. It reads
1099 * messages from the node queue RB and calls the Node QP Message handling
1102 void scif_nodeqp_intrhandler(struct scif_dev *scifdev, struct scif_qp *qp)
1108 read_size = scif_rb_get_next(&qp->inbound_q, &msg, sizeof(msg));
1111 scif_nodeqp_msg_handler(scifdev, qp, &msg);
1113 * The node queue pair is unmapped so skip the read pointer
1114 * update after receipt of a SCIF_EXIT_ACK
1116 if (SCIF_EXIT_ACK == msg.uop)
1118 scif_rb_update_read_ptr(&qp->inbound_q);
1123 * scif_loopb_wq_handler - Loopback Workqueue Handler.
1124 * @unused: loop back work (unused)
1126 * This work queue routine is invoked by the loopback work queue handler.
1127 * It grabs the recv lock, dequeues any available messages from the head
1128 * of the loopback message list, calls the node QP message handler,
1129 * waits for it to return, then frees up this message and dequeues more
1130 * elements of the list if available.
1132 static void scif_loopb_wq_handler(struct work_struct *unused)
1134 struct scif_dev *scifdev = scif_info.loopb_dev;
1135 struct scif_qp *qp = scifdev->qpairs;
1136 struct scif_loopb_msg *msg;
1140 spin_lock(&qp->recv_lock);
1141 if (!list_empty(&scif_info.loopb_recv_q)) {
1142 msg = list_first_entry(&scif_info.loopb_recv_q,
1143 struct scif_loopb_msg,
1145 list_del(&msg->list);
1147 spin_unlock(&qp->recv_lock);
1150 scif_nodeqp_msg_handler(scifdev, qp, &msg->msg);
1157 * scif_loopb_msg_handler() - Workqueue handler for loopback messages.
1158 * @scifdev: SCIF device
1161 * This work queue routine is triggered when a loopback message is received.
1163 * We need special handling for receiving Node Qp messages on a loopback SCIF
1164 * device via two workqueues for receiving messages.
1166 * The reason we need the extra workqueue which is not required with *normal*
1167 * non-loopback SCIF devices is the potential classic deadlock described below:
1169 * Thread A tries to send a message on a loopback SCIF device and blocks since
1170 * there is no space in the RB while it has the send_lock held or another
1171 * lock called lock X for example.
1173 * Thread B: The Loopback Node QP message receive workqueue receives the message
1174 * and tries to send a message (eg an ACK) to the loopback SCIF device. It tries
1175 * to grab the send lock again or lock X and deadlocks with Thread A. The RB
1176 * cannot be drained any further due to this classic deadlock.
1178 * In order to avoid deadlocks as mentioned above we have an extra level of
1179 * indirection achieved by having two workqueues.
1180 * 1) The first workqueue whose handler is scif_loopb_msg_handler reads
1181 * messages from the Node QP RB, adds them to a list and queues work for the
1184 * 2) The second workqueue whose handler is scif_loopb_wq_handler dequeues
1185 * messages from the list, handles them, frees up the memory and dequeues
1186 * more elements from the list if possible.
1189 scif_loopb_msg_handler(struct scif_dev *scifdev, struct scif_qp *qp)
1192 struct scif_loopb_msg *msg;
1195 msg = kmalloc(sizeof(*msg), GFP_KERNEL);
1198 read_size = scif_rb_get_next(&qp->inbound_q, &msg->msg,
1199 sizeof(struct scifmsg));
1200 if (read_size != sizeof(struct scifmsg)) {
1202 scif_rb_update_read_ptr(&qp->inbound_q);
1205 spin_lock(&qp->recv_lock);
1206 list_add_tail(&msg->list, &scif_info.loopb_recv_q);
1207 spin_unlock(&qp->recv_lock);
1208 queue_work(scif_info.loopb_wq, &scif_info.loopb_work);
1209 scif_rb_update_read_ptr(&qp->inbound_q);
1210 } while (read_size == sizeof(struct scifmsg));
1215 * scif_setup_loopback_qp - One time setup work for Loopback Node Qp.
1216 * @scifdev: SCIF device
1218 * Sets up the required loopback workqueues, queue pairs and ring buffers
1220 int scif_setup_loopback_qp(struct scif_dev *scifdev)
1226 err = scif_setup_intr_wq(scifdev);
1229 INIT_LIST_HEAD(&scif_info.loopb_recv_q);
1230 snprintf(scif_info.loopb_wqname, sizeof(scif_info.loopb_wqname),
1231 "SCIF LOOPB %d", scifdev->node);
1232 scif_info.loopb_wq =
1233 alloc_ordered_workqueue(scif_info.loopb_wqname, 0);
1234 if (!scif_info.loopb_wq) {
1238 INIT_WORK(&scif_info.loopb_work, scif_loopb_wq_handler);
1239 /* Allocate Self Qpair */
1240 scifdev->qpairs = kzalloc(sizeof(*scifdev->qpairs), GFP_KERNEL);
1241 if (!scifdev->qpairs) {
1243 goto destroy_loopb_wq;
1246 qp = scifdev->qpairs;
1247 qp->magic = SCIFEP_MAGIC;
1248 spin_lock_init(&qp->send_lock);
1249 spin_lock_init(&qp->recv_lock);
1251 local_q = kzalloc(SCIF_NODE_QP_SIZE, GFP_KERNEL);
1257 * For loopback the inbound_q and outbound_q are essentially the same
1258 * since the Node sends a message on the loopback interface to the
1259 * outbound_q which is then received on the inbound_q.
1261 scif_rb_init(&qp->outbound_q,
1264 local_q, get_count_order(SCIF_NODE_QP_SIZE));
1266 scif_rb_init(&qp->inbound_q,
1269 local_q, get_count_order(SCIF_NODE_QP_SIZE));
1270 scif_info.nodeid = scifdev->node;
1272 scif_peer_register_device(scifdev);
1274 scif_info.loopb_dev = scifdev;
1277 kfree(scifdev->qpairs);
1279 destroy_workqueue(scif_info.loopb_wq);
1281 scif_destroy_intr_wq(scifdev);
1287 * scif_destroy_loopback_qp - One time uninit work for Loopback Node Qp
1288 * @scifdev: SCIF device
1290 * Destroys the workqueues and frees up the Ring Buffer and Queue Pair memory.
1292 int scif_destroy_loopback_qp(struct scif_dev *scifdev)
1294 scif_peer_unregister_device(scifdev);
1295 destroy_workqueue(scif_info.loopb_wq);
1296 scif_destroy_intr_wq(scifdev);
1297 kfree(scifdev->qpairs->outbound_q.rb_base);
1298 kfree(scifdev->qpairs);
1299 scifdev->sdev = NULL;
1300 scif_info.loopb_dev = NULL;
1304 void scif_destroy_p2p(struct scif_dev *scifdev)
1306 struct scif_dev *peer_dev;
1307 struct scif_p2p_info *p2p;
1308 struct list_head *pos, *tmp;
1311 mutex_lock(&scif_info.conflock);
1312 /* Free P2P mappings in the given node for all its peer nodes */
1313 list_for_each_safe(pos, tmp, &scifdev->p2p) {
1314 p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1315 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
1316 p2p->sg_nentries[SCIF_PPI_MMIO],
1318 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
1319 p2p->sg_nentries[SCIF_PPI_APER],
1321 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1322 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1327 /* Free P2P mapping created in the peer nodes for the given node */
1328 for (bd = SCIF_MGMT_NODE + 1; bd <= scif_info.maxid; bd++) {
1329 peer_dev = &scif_dev[bd];
1330 list_for_each_safe(pos, tmp, &peer_dev->p2p) {
1331 p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1332 if (p2p->ppi_peer_id == scifdev->node) {
1333 dma_unmap_sg(&peer_dev->sdev->dev,
1334 p2p->ppi_sg[SCIF_PPI_MMIO],
1335 p2p->sg_nentries[SCIF_PPI_MMIO],
1337 dma_unmap_sg(&peer_dev->sdev->dev,
1338 p2p->ppi_sg[SCIF_PPI_APER],
1339 p2p->sg_nentries[SCIF_PPI_APER],
1341 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1342 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1348 mutex_unlock(&scif_info.conflock);