Merge https://git.kernel.org/pub/scm/linux/kernel/git/bpf/bpf-next
[linux-2.6-microblaze.git] / net / smc / smc_tx.c
index be241d5..5df3940 100644 (file)
@@ -31,7 +31,6 @@
 #include "smc_tracepoint.h"
 
 #define SMC_TX_WORK_DELAY      0
-#define SMC_TX_CORK_DELAY      (HZ >> 2)       /* 250 ms */
 
 /***************************** sndbuf producer *******************************/
 
@@ -236,16 +235,15 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
                 */
                if ((msg->msg_flags & MSG_OOB) && !send_remaining)
                        conn->urg_tx_pend = true;
-               if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) &&
-                   (atomic_read(&conn->sndbuf_space) >
-                                               (conn->sndbuf_desc->len >> 1)))
-                       /* for a corked socket defer the RDMA writes if there
-                        * is still sufficient sndbuf_space available
+               if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc) ||
+                    msg->msg_flags & MSG_SENDPAGE_NOTLAST) &&
+                   (atomic_read(&conn->sndbuf_space)))
+                       /* for a corked socket defer the RDMA writes if
+                        * sndbuf_space is still available. The applications
+                        * should known how/when to uncork it.
                         */
-                       queue_delayed_work(conn->lgr->tx_wq, &conn->tx_work,
-                                          SMC_TX_CORK_DELAY);
-               else
-                       smc_tx_sndbuf_nonempty(conn);
+                       continue;
+               smc_tx_sndbuf_nonempty(conn);
 
                trace_smc_tx_sendmsg(smc, copylen);
        } /* while (msg_data_left(msg)) */
@@ -260,6 +258,22 @@ out_err:
        return rc;
 }
 
+int smc_tx_sendpage(struct smc_sock *smc, struct page *page, int offset,
+                   size_t size, int flags)
+{
+       struct msghdr msg = {.msg_flags = flags};
+       char *kaddr = kmap(page);
+       struct kvec iov;
+       int rc;
+
+       iov.iov_base = kaddr + offset;
+       iov.iov_len = size;
+       iov_iter_kvec(&msg.msg_iter, WRITE, &iov, 1, size);
+       rc = smc_tx_sendmsg(smc, &msg, size);
+       kunmap(page);
+       return rc;
+}
+
 /***************************** sndbuf consumer *******************************/
 
 /* sndbuf consumer: actual data transfer of one target chunk with ISM write */
@@ -598,26 +612,36 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
 }
 
 /* Wakeup sndbuf consumers from process context
- * since there is more data to transmit
+ * since there is more data to transmit. The caller
+ * must hold sock lock.
  */
-void smc_tx_work(struct work_struct *work)
+void smc_tx_pending(struct smc_connection *conn)
 {
-       struct smc_connection *conn = container_of(to_delayed_work(work),
-                                                  struct smc_connection,
-                                                  tx_work);
        struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
        int rc;
 
-       lock_sock(&smc->sk);
        if (smc->sk.sk_err)
-               goto out;
+               return;
 
        rc = smc_tx_sndbuf_nonempty(conn);
        if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked &&
            !atomic_read(&conn->bytes_to_rcv))
                conn->local_rx_ctrl.prod_flags.write_blocked = 0;
+}
+
+/* Wakeup sndbuf consumers from process context
+ * since there is more data to transmit in locked
+ * sock.
+ */
+void smc_tx_work(struct work_struct *work)
+{
+       struct smc_connection *conn = container_of(to_delayed_work(work),
+                                                  struct smc_connection,
+                                                  tx_work);
+       struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
 
-out:
+       lock_sock(&smc->sk);
+       smc_tx_pending(conn);
        release_sock(&smc->sk);
 }