* @qrtr_tx_flow: tree of qrtr_tx_flow, keyed by node << 32 | port
* @qrtr_tx_lock: lock for qrtr_tx_flow inserts
* @rx_queue: receive queue
- * @work: scheduled work struct for recv work
* @item: list item for broadcast list
*/
struct qrtr_node {
struct mutex qrtr_tx_lock; /* for qrtr_tx_flow */
struct sk_buff_head rx_queue;
- struct work_struct work;
struct list_head item;
};
static int qrtr_bcast_enqueue(struct qrtr_node *node, struct sk_buff *skb,
int type, struct sockaddr_qrtr *from,
struct sockaddr_qrtr *to);
+static struct qrtr_sock *qrtr_port_lookup(int port);
+static void qrtr_port_put(struct qrtr_sock *ipc);
/* Release node resources and free the node.
*
list_del(&node->item);
mutex_unlock(&qrtr_node_lock);
- cancel_work_sync(&node->work);
skb_queue_purge(&node->rx_queue);
/* Free tx flow counters */
struct qrtr_node *node = ep->node;
const struct qrtr_hdr_v1 *v1;
const struct qrtr_hdr_v2 *v2;
+ struct qrtr_sock *ipc;
struct sk_buff *skb;
struct qrtr_cb *cb;
unsigned int size;
skb_put_data(skb, data + hdrlen, size);
- skb_queue_tail(&node->rx_queue, skb);
- schedule_work(&node->work);
+ qrtr_node_assign(node, cb->src_node);
+
+ if (cb->type == QRTR_TYPE_RESUME_TX) {
+ qrtr_tx_resume(node, skb);
+ } else {
+ ipc = qrtr_port_lookup(cb->dst_port);
+ if (!ipc)
+ goto err;
+
+ if (sock_queue_rcv_skb(&ipc->sk, skb))
+ goto err;
+
+ qrtr_port_put(ipc);
+ }
return 0;
return skb;
}
-static struct qrtr_sock *qrtr_port_lookup(int port);
-static void qrtr_port_put(struct qrtr_sock *ipc);
-
-/* Handle and route a received packet.
- *
- * This will auto-reply with resume-tx packet as necessary.
- */
-static void qrtr_node_rx_work(struct work_struct *work)
-{
- struct qrtr_node *node = container_of(work, struct qrtr_node, work);
- struct sk_buff *skb;
-
- while ((skb = skb_dequeue(&node->rx_queue)) != NULL) {
- struct qrtr_sock *ipc;
- struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb;
-
- qrtr_node_assign(node, cb->src_node);
-
- if (cb->type == QRTR_TYPE_RESUME_TX) {
- qrtr_tx_resume(node, skb);
- } else {
- ipc = qrtr_port_lookup(cb->dst_port);
- if (!ipc) {
- kfree_skb(skb);
- } else {
- if (sock_queue_rcv_skb(&ipc->sk, skb))
- kfree_skb(skb);
-
- qrtr_port_put(ipc);
- }
- }
- }
-}
-
/**
* qrtr_endpoint_register() - register a new endpoint
* @ep: endpoint to register
if (!node)
return -ENOMEM;
- INIT_WORK(&node->work, qrtr_node_rx_work);
kref_init(&node->ref);
mutex_init(&node->ep_lock);
skb_queue_head_init(&node->rx_queue);