Merge tag 'vfio-v5.14-rc1' of git://github.com/awilliam/linux-vfio
[linux-2.6-microblaze.git] / fs / dlm / lowcomms.c
index 166e36f..0ea9ae3 100644 (file)
@@ -59,7 +59,6 @@
 #include "config.h"
 
 #define NEEDED_RMEM (4*1024*1024)
-#define CONN_HASH_SIZE 32
 
 /* Number of messages to send before rescheduling */
 #define MAX_SEND_MSG_COUNT 25
@@ -79,14 +78,20 @@ struct connection {
 #define CF_CLOSING 8
 #define CF_SHUTDOWN 9
 #define CF_CONNECTED 10
+#define CF_RECONNECT 11
+#define CF_DELAY_CONNECT 12
+#define CF_EOF 13
        struct list_head writequeue;  /* List of outgoing writequeue_entries */
        spinlock_t writequeue_lock;
+       atomic_t writequeue_cnt;
        void (*connect_action) (struct connection *);   /* What to do to connect */
        void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
+       bool (*eof_condition)(struct connection *con); /* What to do to eof check */
        int retries;
 #define MAX_CONNECT_RETRIES 3
        struct hlist_node list;
        struct connection *othercon;
+       struct connection *sendcon;
        struct work_struct rwork; /* Receive workqueue */
        struct work_struct swork; /* Send workqueue */
        wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
@@ -113,7 +118,22 @@ struct writequeue_entry {
        int len;
        int end;
        int users;
+       bool dirty;
        struct connection *con;
+       struct list_head msgs;
+       struct kref ref;
+};
+
+struct dlm_msg {
+       struct writequeue_entry *entry;
+       struct dlm_msg *orig_msg;
+       bool retransmit;
+       void *ppc;
+       int len;
+       int idx; /* new()/commit() idx exchange */
+
+       struct list_head list;
+       struct kref ref;
 };
 
 struct dlm_node_addr {
@@ -155,33 +175,23 @@ static void sctp_connect_to_sock(struct connection *con);
 static void tcp_connect_to_sock(struct connection *con);
 static void dlm_tcp_shutdown(struct connection *con);
 
-/* This is deliberately very simple because most clusters have simple
-   sequential nodeids, so we should be able to go straight to a connection
-   struct in the array */
-static inline int nodeid_hash(int nodeid)
+static struct connection *__find_con(int nodeid, int r)
 {
-       return nodeid & (CONN_HASH_SIZE-1);
-}
-
-static struct connection *__find_con(int nodeid)
-{
-       int r, idx;
        struct connection *con;
 
-       r = nodeid_hash(nodeid);
-
-       idx = srcu_read_lock(&connections_srcu);
        hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
-               if (con->nodeid == nodeid) {
-                       srcu_read_unlock(&connections_srcu, idx);
+               if (con->nodeid == nodeid)
                        return con;
-               }
        }
-       srcu_read_unlock(&connections_srcu, idx);
 
        return NULL;
 }
 
+static bool tcp_eof_condition(struct connection *con)
+{
+       return atomic_read(&con->writequeue_cnt);
+}
+
 static int dlm_con_init(struct connection *con, int nodeid)
 {
        con->rx_buflen = dlm_config.ci_buffer_size;
@@ -193,15 +203,23 @@ static int dlm_con_init(struct connection *con, int nodeid)
        mutex_init(&con->sock_mutex);
        INIT_LIST_HEAD(&con->writequeue);
        spin_lock_init(&con->writequeue_lock);
+       atomic_set(&con->writequeue_cnt, 0);
        INIT_WORK(&con->swork, process_send_sockets);
        INIT_WORK(&con->rwork, process_recv_sockets);
        init_waitqueue_head(&con->shutdown_wait);
 
-       if (dlm_config.ci_protocol == 0) {
+       switch (dlm_config.ci_protocol) {
+       case DLM_PROTO_TCP:
                con->connect_action = tcp_connect_to_sock;
                con->shutdown_action = dlm_tcp_shutdown;
-       } else {
+               con->eof_condition = tcp_eof_condition;
+               break;
+       case DLM_PROTO_SCTP:
                con->connect_action = sctp_connect_to_sock;
+               break;
+       default:
+               kfree(con->rx_buf);
+               return -EINVAL;
        }
 
        return 0;
@@ -216,7 +234,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
        struct connection *con, *tmp;
        int r, ret;
 
-       con = __find_con(nodeid);
+       r = nodeid_hash(nodeid);
+       con = __find_con(nodeid, r);
        if (con || !alloc)
                return con;
 
@@ -230,8 +249,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
                return NULL;
        }
 
-       r = nodeid_hash(nodeid);
-
        spin_lock(&connections_lock);
        /* Because multiple workqueues/threads calls this function it can
         * race on multiple cpu's. Instead of locking hot path __find_con()
@@ -239,7 +256,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
         * under protection of connections_lock. If this is the case we
         * abort our connection creation and return the existing connection.
         */
-       tmp = __find_con(nodeid);
+       tmp = __find_con(nodeid, r);
        if (tmp) {
                spin_unlock(&connections_lock);
                kfree(con->rx_buf);
@@ -256,15 +273,13 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 /* Loop round all connections */
 static void foreach_conn(void (*conn_func)(struct connection *c))
 {
-       int i, idx;
+       int i;
        struct connection *con;
 
-       idx = srcu_read_lock(&connections_srcu);
        for (i = 0; i < CONN_HASH_SIZE; i++) {
                hlist_for_each_entry_rcu(con, &connection_hash[i], list)
                        conn_func(con);
        }
-       srcu_read_unlock(&connections_srcu, idx);
 }
 
 static struct dlm_node_addr *find_node_addr(int nodeid)
@@ -462,6 +477,9 @@ static void lowcomms_data_ready(struct sock *sk)
 
 static void lowcomms_listen_data_ready(struct sock *sk)
 {
+       if (!dlm_allow_conn)
+               return;
+
        queue_work(recv_workqueue, &listen_con.rwork);
 }
 
@@ -518,14 +536,21 @@ static void lowcomms_state_change(struct sock *sk)
 int dlm_lowcomms_connect_node(int nodeid)
 {
        struct connection *con;
+       int idx;
 
        if (nodeid == dlm_our_nodeid())
                return 0;
 
+       idx = srcu_read_lock(&connections_srcu);
        con = nodeid2con(nodeid, GFP_NOFS);
-       if (!con)
+       if (!con) {
+               srcu_read_unlock(&connections_srcu, idx);
                return -ENOMEM;
+       }
+
        lowcomms_connect_sock(con);
+       srcu_read_unlock(&connections_srcu, idx);
+
        return 0;
 }
 
@@ -587,6 +612,22 @@ static void lowcomms_error_report(struct sock *sk)
                                   dlm_config.ci_tcp_port, sk->sk_err,
                                   sk->sk_err_soft);
        }
+
+       /* below sendcon only handling */
+       if (test_bit(CF_IS_OTHERCON, &con->flags))
+               con = con->sendcon;
+
+       switch (sk->sk_err) {
+       case ECONNREFUSED:
+               set_bit(CF_DELAY_CONNECT, &con->flags);
+               break;
+       default:
+               break;
+       }
+
+       if (!test_and_set_bit(CF_RECONNECT, &con->flags))
+               queue_work(send_workqueue, &con->swork);
+
 out:
        read_unlock_bh(&sk->sk_callback_lock);
        if (orig_report)
@@ -669,6 +710,42 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 }
 
+static void dlm_page_release(struct kref *kref)
+{
+       struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
+                                                 ref);
+
+       __free_page(e->page);
+       kfree(e);
+}
+
+static void dlm_msg_release(struct kref *kref)
+{
+       struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
+
+       kref_put(&msg->entry->ref, dlm_page_release);
+       kfree(msg);
+}
+
+static void free_entry(struct writequeue_entry *e)
+{
+       struct dlm_msg *msg, *tmp;
+
+       list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
+               if (msg->orig_msg) {
+                       msg->orig_msg->retransmit = false;
+                       kref_put(&msg->orig_msg->ref, dlm_msg_release);
+               }
+
+               list_del(&msg->list);
+               kref_put(&msg->ref, dlm_msg_release);
+       }
+
+       list_del(&e->list);
+       atomic_dec(&e->con->writequeue_cnt);
+       kref_put(&e->ref, dlm_page_release);
+}
+
 static void dlm_close_sock(struct socket **sock)
 {
        if (*sock) {
@@ -683,6 +760,7 @@ static void close_connection(struct connection *con, bool and_other,
                             bool tx, bool rx)
 {
        bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
+       struct writequeue_entry *e;
 
        if (tx && !closing && cancel_work_sync(&con->swork)) {
                log_print("canceled swork for node %d", con->nodeid);
@@ -698,12 +776,35 @@ static void close_connection(struct connection *con, bool and_other,
 
        if (con->othercon && and_other) {
                /* Will only re-enter once. */
-               close_connection(con->othercon, false, true, true);
+               close_connection(con->othercon, false, tx, rx);
+       }
+
+       /* if we send a writequeue entry only a half way, we drop the
+        * whole entry because reconnection and that we not start of the
+        * middle of a msg which will confuse the other end.
+        *
+        * we can always drop messages because retransmits, but what we
+        * cannot allow is to transmit half messages which may be processed
+        * at the other side.
+        *
+        * our policy is to start on a clean state when disconnects, we don't
+        * know what's send/received on transport layer in this case.
+        */
+       spin_lock(&con->writequeue_lock);
+       if (!list_empty(&con->writequeue)) {
+               e = list_first_entry(&con->writequeue, struct writequeue_entry,
+                                    list);
+               if (e->dirty)
+                       free_entry(e);
        }
+       spin_unlock(&con->writequeue_lock);
 
        con->rx_leftover = 0;
        con->retries = 0;
        clear_bit(CF_CONNECTED, &con->flags);
+       clear_bit(CF_DELAY_CONNECT, &con->flags);
+       clear_bit(CF_RECONNECT, &con->flags);
+       clear_bit(CF_EOF, &con->flags);
        mutex_unlock(&con->sock_mutex);
        clear_bit(CF_CLOSING, &con->flags);
 }
@@ -841,19 +942,26 @@ out_resched:
        return -EAGAIN;
 
 out_close:
-       mutex_unlock(&con->sock_mutex);
-       if (ret != -EAGAIN) {
-               /* Reconnect when there is something to send */
-               close_connection(con, false, true, false);
-               if (ret == 0) {
-                       log_print("connection %p got EOF from %d",
-                                 con, con->nodeid);
+       if (ret == 0) {
+               log_print("connection %p got EOF from %d",
+                         con, con->nodeid);
+
+               if (con->eof_condition && con->eof_condition(con)) {
+                       set_bit(CF_EOF, &con->flags);
+                       mutex_unlock(&con->sock_mutex);
+               } else {
+                       mutex_unlock(&con->sock_mutex);
+                       close_connection(con, false, true, false);
+
                        /* handling for tcp shutdown */
                        clear_bit(CF_SHUTDOWN, &con->flags);
                        wake_up(&con->shutdown_wait);
-                       /* signal to breaking receive worker */
-                       ret = -1;
                }
+
+               /* signal to breaking receive worker */
+               ret = -1;
+       } else {
+               mutex_unlock(&con->sock_mutex);
        }
        return ret;
 }
@@ -864,16 +972,12 @@ static int accept_from_sock(struct listen_connection *con)
        int result;
        struct sockaddr_storage peeraddr;
        struct socket *newsock;
-       int len;
+       int len, idx;
        int nodeid;
        struct connection *newcon;
        struct connection *addcon;
        unsigned int mark;
 
-       if (!dlm_allow_conn) {
-               return -1;
-       }
-
        if (!con->sock)
                return -ENOTCONN;
 
@@ -907,8 +1011,10 @@ static int accept_from_sock(struct listen_connection *con)
         *  the same time and the connections cross on the wire.
         *  In this case we store the incoming one in "othercon"
         */
+       idx = srcu_read_lock(&connections_srcu);
        newcon = nodeid2con(nodeid, GFP_NOFS);
        if (!newcon) {
+               srcu_read_unlock(&connections_srcu, idx);
                result = -ENOMEM;
                goto accept_err;
        }
@@ -924,6 +1030,7 @@ static int accept_from_sock(struct listen_connection *con)
                        if (!othercon) {
                                log_print("failed to allocate incoming socket");
                                mutex_unlock(&newcon->sock_mutex);
+                               srcu_read_unlock(&connections_srcu, idx);
                                result = -ENOMEM;
                                goto accept_err;
                        }
@@ -932,11 +1039,14 @@ static int accept_from_sock(struct listen_connection *con)
                        if (result < 0) {
                                kfree(othercon);
                                mutex_unlock(&newcon->sock_mutex);
+                               srcu_read_unlock(&connections_srcu, idx);
                                goto accept_err;
                        }
 
                        lockdep_set_subclass(&othercon->sock_mutex, 1);
+                       set_bit(CF_IS_OTHERCON, &othercon->flags);
                        newcon->othercon = othercon;
+                       othercon->sendcon = newcon;
                } else {
                        /* close other sock con if we have something new */
                        close_connection(othercon, false, true, false);
@@ -966,6 +1076,8 @@ static int accept_from_sock(struct listen_connection *con)
        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
                queue_work(recv_workqueue, &addcon->rwork);
 
+       srcu_read_unlock(&connections_srcu, idx);
+
        return 0;
 
 accept_err:
@@ -977,12 +1089,6 @@ accept_err:
        return result;
 }
 
-static void free_entry(struct writequeue_entry *e)
-{
-       __free_page(e->page);
-       kfree(e);
-}
-
 /*
  * writequeue_entry_complete - try to delete and free write queue entry
  * @e: write queue entry to try to delete
@@ -994,11 +1100,11 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
 {
        e->offset += completed;
        e->len -= completed;
+       /* signal that page was half way transmitted */
+       e->dirty = true;
 
-       if (e->len == 0 && e->users == 0) {
-               list_del(&e->list);
+       if (e->len == 0 && e->users == 0)
                free_entry(e);
-       }
 }
 
 /*
@@ -1075,7 +1181,7 @@ static void sctp_connect_to_sock(struct connection *con)
 
        make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
 
-       log_print("connecting to %d", con->nodeid);
+       log_print_ratelimited("connecting to %d", con->nodeid);
 
        /* Turn off Nagle's algorithm */
        sctp_sock_set_nodelay(sock->sk);
@@ -1171,7 +1277,7 @@ static void tcp_connect_to_sock(struct connection *con)
 
        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
 
-       log_print("connecting to %d", con->nodeid);
+       log_print_ratelimited("connecting to %d", con->nodeid);
 
        /* Turn off Nagle's algorithm */
        tcp_sock_set_nodelay(sock->sk);
@@ -1364,12 +1470,16 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
 
        entry->con = con;
        entry->users = 1;
+       kref_init(&entry->ref);
+       INIT_LIST_HEAD(&entry->msgs);
 
        return entry;
 }
 
 static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
-                                            gfp_t allocation, char **ppc)
+                                            gfp_t allocation, char **ppc,
+                                            void (*cb)(struct dlm_mhandle *mh),
+                                            struct dlm_mhandle *mh)
 {
        struct writequeue_entry *e;
 
@@ -1377,7 +1487,12 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
        if (!list_empty(&con->writequeue)) {
                e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
                if (DLM_WQ_REMAIN_BYTES(e) >= len) {
+                       kref_get(&e->ref);
+
                        *ppc = page_address(e->page) + e->end;
+                       if (cb)
+                               cb(mh);
+
                        e->end += len;
                        e->users++;
                        spin_unlock(&con->writequeue_lock);
@@ -1391,42 +1506,92 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
        if (!e)
                return NULL;
 
+       kref_get(&e->ref);
        *ppc = page_address(e->page);
        e->end += len;
+       atomic_inc(&con->writequeue_cnt);
 
        spin_lock(&con->writequeue_lock);
+       if (cb)
+               cb(mh);
+
        list_add_tail(&e->list, &con->writequeue);
        spin_unlock(&con->writequeue_lock);
 
        return e;
 };
 
-void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
+static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
+                                               gfp_t allocation, char **ppc,
+                                               void (*cb)(struct dlm_mhandle *mh),
+                                               struct dlm_mhandle *mh)
+{
+       struct writequeue_entry *e;
+       struct dlm_msg *msg;
+
+       msg = kzalloc(sizeof(*msg), allocation);
+       if (!msg)
+               return NULL;
+
+       kref_init(&msg->ref);
+
+       e = new_wq_entry(con, len, allocation, ppc, cb, mh);
+       if (!e) {
+               kfree(msg);
+               return NULL;
+       }
+
+       msg->ppc = *ppc;
+       msg->len = len;
+       msg->entry = e;
+
+       return msg;
+}
+
+struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
+                                    char **ppc, void (*cb)(struct dlm_mhandle *mh),
+                                    struct dlm_mhandle *mh)
 {
        struct connection *con;
+       struct dlm_msg *msg;
+       int idx;
 
-       if (len > DEFAULT_BUFFER_SIZE ||
+       if (len > DLM_MAX_SOCKET_BUFSIZE ||
            len < sizeof(struct dlm_header)) {
-               BUILD_BUG_ON(PAGE_SIZE < DEFAULT_BUFFER_SIZE);
+               BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
                log_print("failed to allocate a buffer of size %d", len);
                WARN_ON(1);
                return NULL;
        }
 
+       idx = srcu_read_lock(&connections_srcu);
        con = nodeid2con(nodeid, allocation);
-       if (!con)
+       if (!con) {
+               srcu_read_unlock(&connections_srcu, idx);
                return NULL;
+       }
 
-       return new_wq_entry(con, len, allocation, ppc);
+       msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh);
+       if (!msg) {
+               srcu_read_unlock(&connections_srcu, idx);
+               return NULL;
+       }
+
+       /* we assume if successful commit must called */
+       msg->idx = idx;
+       return msg;
 }
 
-void dlm_lowcomms_commit_buffer(void *mh)
+static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
 {
-       struct writequeue_entry *e = (struct writequeue_entry *)mh;
+       struct writequeue_entry *e = msg->entry;
        struct connection *con = e->con;
        int users;
 
        spin_lock(&con->writequeue_lock);
+       kref_get(&msg->ref);
+       list_add(&msg->list, &e->msgs);
+
        users = --e->users;
        if (users)
                goto out;
@@ -1442,6 +1607,42 @@ out:
        return;
 }
 
+void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
+{
+       _dlm_lowcomms_commit_msg(msg);
+       srcu_read_unlock(&connections_srcu, msg->idx);
+}
+
+void dlm_lowcomms_put_msg(struct dlm_msg *msg)
+{
+       kref_put(&msg->ref, dlm_msg_release);
+}
+
+/* does not held connections_srcu, usage workqueue only */
+int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
+{
+       struct dlm_msg *msg_resend;
+       char *ppc;
+
+       if (msg->retransmit)
+               return 1;
+
+       msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
+                                             GFP_ATOMIC, &ppc, NULL, NULL);
+       if (!msg_resend)
+               return -ENOMEM;
+
+       msg->retransmit = true;
+       kref_get(&msg->ref);
+       msg_resend->orig_msg = msg;
+
+       memcpy(ppc, msg->ppc, msg->len);
+       _dlm_lowcomms_commit_msg(msg_resend);
+       dlm_lowcomms_put_msg(msg_resend);
+
+       return 0;
+}
+
 /* Send a message */
 static void send_to_sock(struct connection *con)
 {
@@ -1483,7 +1684,7 @@ static void send_to_sock(struct connection *con)
                                cond_resched();
                                goto out;
                        } else if (ret < 0)
-                               goto send_error;
+                               goto out;
                }
 
                /* Don't starve people filling buffers */
@@ -1496,16 +1697,23 @@ static void send_to_sock(struct connection *con)
                writequeue_entry_complete(e, ret);
        }
        spin_unlock(&con->writequeue_lock);
-out:
-       mutex_unlock(&con->sock_mutex);
+
+       /* close if we got EOF */
+       if (test_and_clear_bit(CF_EOF, &con->flags)) {
+               mutex_unlock(&con->sock_mutex);
+               close_connection(con, false, false, true);
+
+               /* handling for tcp shutdown */
+               clear_bit(CF_SHUTDOWN, &con->flags);
+               wake_up(&con->shutdown_wait);
+       } else {
+               mutex_unlock(&con->sock_mutex);
+       }
+
        return;
 
-send_error:
+out:
        mutex_unlock(&con->sock_mutex);
-       close_connection(con, false, false, true);
-       /* Requeue the send work. When the work daemon runs again, it will try
-          a new connection, then call this function again. */
-       queue_work(send_workqueue, &con->swork);
        return;
 
 out_connect:
@@ -1520,7 +1728,6 @@ static void clean_one_writequeue(struct connection *con)
 
        spin_lock(&con->writequeue_lock);
        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
-               list_del(&e->list);
                free_entry(e);
        }
        spin_unlock(&con->writequeue_lock);
@@ -1532,8 +1739,10 @@ int dlm_lowcomms_close(int nodeid)
 {
        struct connection *con;
        struct dlm_node_addr *na;
+       int idx;
 
        log_print("closing connection to node %d", nodeid);
+       idx = srcu_read_lock(&connections_srcu);
        con = nodeid2con(nodeid, 0);
        if (con) {
                set_bit(CF_CLOSE, &con->flags);
@@ -1542,6 +1751,7 @@ int dlm_lowcomms_close(int nodeid)
                if (con->othercon)
                        clean_one_writequeue(con->othercon);
        }
+       srcu_read_unlock(&connections_srcu, idx);
 
        spin_lock(&dlm_node_addrs_spin);
        na = find_node_addr(nodeid);
@@ -1578,35 +1788,50 @@ static void process_send_sockets(struct work_struct *work)
 {
        struct connection *con = container_of(work, struct connection, swork);
 
+       WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
+
        clear_bit(CF_WRITE_PENDING, &con->flags);
-       if (con->sock == NULL) /* not mutex protected so check it inside too */
+
+       if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
+               close_connection(con, false, false, true);
+               dlm_midcomms_unack_msg_resend(con->nodeid);
+       }
+
+       if (con->sock == NULL) { /* not mutex protected so check it inside too */
+               if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
+                       msleep(1000);
                con->connect_action(con);
+       }
        if (!list_empty(&con->writequeue))
                send_to_sock(con);
 }
 
 static void work_stop(void)
 {
-       if (recv_workqueue)
+       if (recv_workqueue) {
                destroy_workqueue(recv_workqueue);
-       if (send_workqueue)
+               recv_workqueue = NULL;
+       }
+
+       if (send_workqueue) {
                destroy_workqueue(send_workqueue);
+               send_workqueue = NULL;
+       }
 }
 
 static int work_start(void)
 {
-       recv_workqueue = alloc_workqueue("dlm_recv",
-                                        WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
+       recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
        if (!recv_workqueue) {
                log_print("can't start dlm_recv");
                return -ENOMEM;
        }
 
-       send_workqueue = alloc_workqueue("dlm_send",
-                                        WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
+       send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
        if (!send_workqueue) {
                log_print("can't start dlm_send");
                destroy_workqueue(recv_workqueue);
+               recv_workqueue = NULL;
                return -ENOMEM;
        }
 
@@ -1621,6 +1846,8 @@ static void shutdown_conn(struct connection *con)
 
 void dlm_lowcomms_shutdown(void)
 {
+       int idx;
+
        /* Set all the flags to prevent any
         * socket activity.
         */
@@ -1633,7 +1860,9 @@ void dlm_lowcomms_shutdown(void)
 
        dlm_close_sock(&listen_con.sock);
 
+       idx = srcu_read_lock(&connections_srcu);
        foreach_conn(shutdown_conn);
+       srcu_read_unlock(&connections_srcu, idx);
 }
 
 static void _stop_conn(struct connection *con, bool and_other)
@@ -1682,7 +1911,7 @@ static void free_conn(struct connection *con)
 
 static void work_flush(void)
 {
-       int ok, idx;
+       int ok;
        int i;
        struct connection *con;
 
@@ -1693,7 +1922,6 @@ static void work_flush(void)
                        flush_workqueue(recv_workqueue);
                if (send_workqueue)
                        flush_workqueue(send_workqueue);
-               idx = srcu_read_lock(&connections_srcu);
                for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
                        hlist_for_each_entry_rcu(con, &connection_hash[i],
                                                 list) {
@@ -1707,14 +1935,17 @@ static void work_flush(void)
                                }
                        }
                }
-               srcu_read_unlock(&connections_srcu, idx);
        } while (!ok);
 }
 
 void dlm_lowcomms_stop(void)
 {
+       int idx;
+
+       idx = srcu_read_lock(&connections_srcu);
        work_flush();
        foreach_conn(free_conn);
+       srcu_read_unlock(&connections_srcu, idx);
        work_stop();
        deinit_local();
 }
@@ -1738,15 +1969,24 @@ int dlm_lowcomms_start(void)
 
        error = work_start();
        if (error)
-               goto fail;
+               goto fail_local;
 
        dlm_allow_conn = 1;
 
        /* Start listening */
-       if (dlm_config.ci_protocol == 0)
+       switch (dlm_config.ci_protocol) {
+       case DLM_PROTO_TCP:
                error = tcp_listen_for_all();
-       else
+               break;
+       case DLM_PROTO_SCTP:
                error = sctp_listen_for_all(&listen_con);
+               break;
+       default:
+               log_print("Invalid protocol identifier %d set",
+                         dlm_config.ci_protocol);
+               error = -EINVAL;
+               break;
+       }
        if (error)
                goto fail_unlisten;
 
@@ -1755,6 +1995,9 @@ int dlm_lowcomms_start(void)
 fail_unlisten:
        dlm_allow_conn = 0;
        dlm_close_sock(&listen_con.sock);
+       work_stop();
+fail_local:
+       deinit_local();
 fail:
        return error;
 }