980c58befd533802b0a6c3fde5c4f3cd5dcf3380
[linux-2.6-microblaze.git] / fs / dlm / lowcomms.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is its
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * lowcomms will choose to use either TCP or SCTP as its transport layer
40  * depending on the configuration variable 'protocol'. This should be set
41  * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
42  * cluster-wide mechanism as it must be the same on all nodes of the cluster
43  * for the DLM to function.
44  *
45  */
46
47 #include <asm/ioctls.h>
48 #include <net/sock.h>
49 #include <net/tcp.h>
50 #include <linux/pagemap.h>
51 #include <linux/file.h>
52 #include <linux/mutex.h>
53 #include <linux/sctp.h>
54 #include <linux/slab.h>
55 #include <net/sctp/sctp.h>
56 #include <net/ipv6.h>
57
58 #include "dlm_internal.h"
59 #include "lowcomms.h"
60 #include "midcomms.h"
61 #include "config.h"
62
63 #define NEEDED_RMEM (4*1024*1024)
64 #define CONN_HASH_SIZE 32
65
66 /* Number of messages to send before rescheduling */
67 #define MAX_SEND_MSG_COUNT 25
68
69 struct cbuf {
70         unsigned int base;
71         unsigned int len;
72         unsigned int mask;
73 };
74
75 static void cbuf_add(struct cbuf *cb, int n)
76 {
77         cb->len += n;
78 }
79
80 static int cbuf_data(struct cbuf *cb)
81 {
82         return ((cb->base + cb->len) & cb->mask);
83 }
84
85 static void cbuf_init(struct cbuf *cb, int size)
86 {
87         cb->base = cb->len = 0;
88         cb->mask = size-1;
89 }
90
91 static void cbuf_eat(struct cbuf *cb, int n)
92 {
93         cb->len  -= n;
94         cb->base += n;
95         cb->base &= cb->mask;
96 }
97
98 static bool cbuf_empty(struct cbuf *cb)
99 {
100         return cb->len == 0;
101 }
102
103 struct connection {
104         struct socket *sock;    /* NULL if not connected */
105         uint32_t nodeid;        /* So we know who we are in the list */
106         struct mutex sock_mutex;
107         unsigned long flags;
108 #define CF_READ_PENDING 1
109 #define CF_INIT_PENDING 4
110 #define CF_IS_OTHERCON 5
111 #define CF_CLOSE 6
112 #define CF_APP_LIMITED 7
113         struct list_head writequeue;  /* List of outgoing writequeue_entries */
114         spinlock_t writequeue_lock;
115         int (*rx_action) (struct connection *); /* What to do when active */
116         void (*connect_action) (struct connection *);   /* What to do to connect */
117         struct page *rx_page;
118         struct cbuf cb;
119         int retries;
120 #define MAX_CONNECT_RETRIES 3
121         struct hlist_node list;
122         struct connection *othercon;
123         struct work_struct rwork; /* Receive workqueue */
124         struct work_struct swork; /* Send workqueue */
125 };
126 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
127
128 /* An entry waiting to be sent */
129 struct writequeue_entry {
130         struct list_head list;
131         struct page *page;
132         int offset;
133         int len;
134         int end;
135         int users;
136         struct connection *con;
137 };
138
139 struct dlm_node_addr {
140         struct list_head list;
141         int nodeid;
142         int addr_count;
143         int curr_addr_index;
144         struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
145 };
146
147 static struct listen_sock_callbacks {
148         void (*sk_error_report)(struct sock *);
149         void (*sk_data_ready)(struct sock *);
150         void (*sk_state_change)(struct sock *);
151         void (*sk_write_space)(struct sock *);
152 } listen_sock;
153
154 static LIST_HEAD(dlm_node_addrs);
155 static DEFINE_SPINLOCK(dlm_node_addrs_spin);
156
157 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
158 static int dlm_local_count;
159 static int dlm_allow_conn;
160
161 /* Work queues */
162 static struct workqueue_struct *recv_workqueue;
163 static struct workqueue_struct *send_workqueue;
164
165 static struct hlist_head connection_hash[CONN_HASH_SIZE];
166 static DEFINE_MUTEX(connections_lock);
167 static struct kmem_cache *con_cache;
168
169 static void process_recv_sockets(struct work_struct *work);
170 static void process_send_sockets(struct work_struct *work);
171
172
173 /* This is deliberately very simple because most clusters have simple
174    sequential nodeids, so we should be able to go straight to a connection
175    struct in the array */
176 static inline int nodeid_hash(int nodeid)
177 {
178         return nodeid & (CONN_HASH_SIZE-1);
179 }
180
181 static struct connection *__find_con(int nodeid)
182 {
183         int r;
184         struct connection *con;
185
186         r = nodeid_hash(nodeid);
187
188         hlist_for_each_entry(con, &connection_hash[r], list) {
189                 if (con->nodeid == nodeid)
190                         return con;
191         }
192         return NULL;
193 }
194
195 /*
196  * If 'allocation' is zero then we don't attempt to create a new
197  * connection structure for this node.
198  */
199 static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
200 {
201         struct connection *con = NULL;
202         int r;
203
204         con = __find_con(nodeid);
205         if (con || !alloc)
206                 return con;
207
208         con = kmem_cache_zalloc(con_cache, alloc);
209         if (!con)
210                 return NULL;
211
212         r = nodeid_hash(nodeid);
213         hlist_add_head(&con->list, &connection_hash[r]);
214
215         con->nodeid = nodeid;
216         mutex_init(&con->sock_mutex);
217         INIT_LIST_HEAD(&con->writequeue);
218         spin_lock_init(&con->writequeue_lock);
219         INIT_WORK(&con->swork, process_send_sockets);
220         INIT_WORK(&con->rwork, process_recv_sockets);
221
222         /* Setup action pointers for child sockets */
223         if (con->nodeid) {
224                 struct connection *zerocon = __find_con(0);
225
226                 con->connect_action = zerocon->connect_action;
227                 if (!con->rx_action)
228                         con->rx_action = zerocon->rx_action;
229         }
230
231         return con;
232 }
233
234 /* Loop round all connections */
235 static void foreach_conn(void (*conn_func)(struct connection *c))
236 {
237         int i;
238         struct hlist_node *n;
239         struct connection *con;
240
241         for (i = 0; i < CONN_HASH_SIZE; i++) {
242                 hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
243                         conn_func(con);
244         }
245 }
246
247 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
248 {
249         struct connection *con;
250
251         mutex_lock(&connections_lock);
252         con = __nodeid2con(nodeid, allocation);
253         mutex_unlock(&connections_lock);
254
255         return con;
256 }
257
258 static struct dlm_node_addr *find_node_addr(int nodeid)
259 {
260         struct dlm_node_addr *na;
261
262         list_for_each_entry(na, &dlm_node_addrs, list) {
263                 if (na->nodeid == nodeid)
264                         return na;
265         }
266         return NULL;
267 }
268
269 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
270 {
271         switch (x->ss_family) {
272         case AF_INET: {
273                 struct sockaddr_in *sinx = (struct sockaddr_in *)x;
274                 struct sockaddr_in *siny = (struct sockaddr_in *)y;
275                 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
276                         return 0;
277                 if (sinx->sin_port != siny->sin_port)
278                         return 0;
279                 break;
280         }
281         case AF_INET6: {
282                 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
283                 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
284                 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
285                         return 0;
286                 if (sinx->sin6_port != siny->sin6_port)
287                         return 0;
288                 break;
289         }
290         default:
291                 return 0;
292         }
293         return 1;
294 }
295
296 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
297                           struct sockaddr *sa_out, bool try_new_addr)
298 {
299         struct sockaddr_storage sas;
300         struct dlm_node_addr *na;
301
302         if (!dlm_local_count)
303                 return -1;
304
305         spin_lock(&dlm_node_addrs_spin);
306         na = find_node_addr(nodeid);
307         if (na && na->addr_count) {
308                 memcpy(&sas, na->addr[na->curr_addr_index],
309                        sizeof(struct sockaddr_storage));
310
311                 if (try_new_addr) {
312                         na->curr_addr_index++;
313                         if (na->curr_addr_index == na->addr_count)
314                                 na->curr_addr_index = 0;
315                 }
316         }
317         spin_unlock(&dlm_node_addrs_spin);
318
319         if (!na)
320                 return -EEXIST;
321
322         if (!na->addr_count)
323                 return -ENOENT;
324
325         if (sas_out)
326                 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
327
328         if (!sa_out)
329                 return 0;
330
331         if (dlm_local_addr[0]->ss_family == AF_INET) {
332                 struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
333                 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
334                 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
335         } else {
336                 struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
337                 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
338                 ret6->sin6_addr = in6->sin6_addr;
339         }
340
341         return 0;
342 }
343
344 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
345 {
346         struct dlm_node_addr *na;
347         int rv = -EEXIST;
348         int addr_i;
349
350         spin_lock(&dlm_node_addrs_spin);
351         list_for_each_entry(na, &dlm_node_addrs, list) {
352                 if (!na->addr_count)
353                         continue;
354
355                 for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
356                         if (addr_compare(na->addr[addr_i], addr)) {
357                                 *nodeid = na->nodeid;
358                                 rv = 0;
359                                 goto unlock;
360                         }
361                 }
362         }
363 unlock:
364         spin_unlock(&dlm_node_addrs_spin);
365         return rv;
366 }
367
368 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
369 {
370         struct sockaddr_storage *new_addr;
371         struct dlm_node_addr *new_node, *na;
372
373         new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
374         if (!new_node)
375                 return -ENOMEM;
376
377         new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
378         if (!new_addr) {
379                 kfree(new_node);
380                 return -ENOMEM;
381         }
382
383         memcpy(new_addr, addr, len);
384
385         spin_lock(&dlm_node_addrs_spin);
386         na = find_node_addr(nodeid);
387         if (!na) {
388                 new_node->nodeid = nodeid;
389                 new_node->addr[0] = new_addr;
390                 new_node->addr_count = 1;
391                 list_add(&new_node->list, &dlm_node_addrs);
392                 spin_unlock(&dlm_node_addrs_spin);
393                 return 0;
394         }
395
396         if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
397                 spin_unlock(&dlm_node_addrs_spin);
398                 kfree(new_addr);
399                 kfree(new_node);
400                 return -ENOSPC;
401         }
402
403         na->addr[na->addr_count++] = new_addr;
404         spin_unlock(&dlm_node_addrs_spin);
405         kfree(new_node);
406         return 0;
407 }
408
409 /* Data available on socket or listen socket received a connect */
410 static void lowcomms_data_ready(struct sock *sk)
411 {
412         struct connection *con = sock2con(sk);
413         if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
414                 queue_work(recv_workqueue, &con->rwork);
415 }
416
417 static void lowcomms_write_space(struct sock *sk)
418 {
419         struct connection *con = sock2con(sk);
420
421         if (!con)
422                 return;
423
424         clear_bit(SOCK_NOSPACE, &con->sock->flags);
425
426         if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
427                 con->sock->sk->sk_write_pending--;
428                 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
429         }
430
431         queue_work(send_workqueue, &con->swork);
432 }
433
434 static inline void lowcomms_connect_sock(struct connection *con)
435 {
436         if (test_bit(CF_CLOSE, &con->flags))
437                 return;
438         queue_work(send_workqueue, &con->swork);
439         cond_resched();
440 }
441
442 static void lowcomms_state_change(struct sock *sk)
443 {
444         /* SCTP layer is not calling sk_data_ready when the connection
445          * is done, so we catch the signal through here. Also, it
446          * doesn't switch socket state when entering shutdown, so we
447          * skip the write in that case.
448          */
449         if (sk->sk_shutdown) {
450                 if (sk->sk_shutdown == RCV_SHUTDOWN)
451                         lowcomms_data_ready(sk);
452         } else if (sk->sk_state == TCP_ESTABLISHED) {
453                 lowcomms_write_space(sk);
454         }
455 }
456
457 int dlm_lowcomms_connect_node(int nodeid)
458 {
459         struct connection *con;
460
461         if (nodeid == dlm_our_nodeid())
462                 return 0;
463
464         con = nodeid2con(nodeid, GFP_NOFS);
465         if (!con)
466                 return -ENOMEM;
467         lowcomms_connect_sock(con);
468         return 0;
469 }
470
471 static void lowcomms_error_report(struct sock *sk)
472 {
473         struct connection *con;
474         struct sockaddr_storage saddr;
475         int buflen;
476         void (*orig_report)(struct sock *) = NULL;
477
478         read_lock_bh(&sk->sk_callback_lock);
479         con = sock2con(sk);
480         if (con == NULL)
481                 goto out;
482
483         orig_report = listen_sock.sk_error_report;
484         if (con->sock == NULL ||
485             kernel_getpeername(con->sock, (struct sockaddr *)&saddr, &buflen)) {
486                 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
487                                    "sending to node %d, port %d, "
488                                    "sk_err=%d/%d\n", dlm_our_nodeid(),
489                                    con->nodeid, dlm_config.ci_tcp_port,
490                                    sk->sk_err, sk->sk_err_soft);
491         } else if (saddr.ss_family == AF_INET) {
492                 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
493
494                 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
495                                    "sending to node %d at %pI4, port %d, "
496                                    "sk_err=%d/%d\n", dlm_our_nodeid(),
497                                    con->nodeid, &sin4->sin_addr.s_addr,
498                                    dlm_config.ci_tcp_port, sk->sk_err,
499                                    sk->sk_err_soft);
500         } else {
501                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
502
503                 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
504                                    "sending to node %d at %u.%u.%u.%u, "
505                                    "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
506                                    con->nodeid, sin6->sin6_addr.s6_addr32[0],
507                                    sin6->sin6_addr.s6_addr32[1],
508                                    sin6->sin6_addr.s6_addr32[2],
509                                    sin6->sin6_addr.s6_addr32[3],
510                                    dlm_config.ci_tcp_port, sk->sk_err,
511                                    sk->sk_err_soft);
512         }
513 out:
514         read_unlock_bh(&sk->sk_callback_lock);
515         if (orig_report)
516                 orig_report(sk);
517 }
518
519 /* Note: sk_callback_lock must be locked before calling this function. */
520 static void save_listen_callbacks(struct socket *sock)
521 {
522         struct sock *sk = sock->sk;
523
524         listen_sock.sk_data_ready = sk->sk_data_ready;
525         listen_sock.sk_state_change = sk->sk_state_change;
526         listen_sock.sk_write_space = sk->sk_write_space;
527         listen_sock.sk_error_report = sk->sk_error_report;
528 }
529
530 static void restore_callbacks(struct socket *sock)
531 {
532         struct sock *sk = sock->sk;
533
534         write_lock_bh(&sk->sk_callback_lock);
535         sk->sk_user_data = NULL;
536         sk->sk_data_ready = listen_sock.sk_data_ready;
537         sk->sk_state_change = listen_sock.sk_state_change;
538         sk->sk_write_space = listen_sock.sk_write_space;
539         sk->sk_error_report = listen_sock.sk_error_report;
540         write_unlock_bh(&sk->sk_callback_lock);
541 }
542
543 /* Make a socket active */
544 static void add_sock(struct socket *sock, struct connection *con)
545 {
546         struct sock *sk = sock->sk;
547
548         write_lock_bh(&sk->sk_callback_lock);
549         con->sock = sock;
550
551         sk->sk_user_data = con;
552         /* Install a data_ready callback */
553         sk->sk_data_ready = lowcomms_data_ready;
554         sk->sk_write_space = lowcomms_write_space;
555         sk->sk_state_change = lowcomms_state_change;
556         sk->sk_allocation = GFP_NOFS;
557         sk->sk_error_report = lowcomms_error_report;
558         write_unlock_bh(&sk->sk_callback_lock);
559 }
560
561 /* Add the port number to an IPv6 or 4 sockaddr and return the address
562    length */
563 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
564                           int *addr_len)
565 {
566         saddr->ss_family =  dlm_local_addr[0]->ss_family;
567         if (saddr->ss_family == AF_INET) {
568                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
569                 in4_addr->sin_port = cpu_to_be16(port);
570                 *addr_len = sizeof(struct sockaddr_in);
571                 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
572         } else {
573                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
574                 in6_addr->sin6_port = cpu_to_be16(port);
575                 *addr_len = sizeof(struct sockaddr_in6);
576         }
577         memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
578 }
579
580 /* Close a remote connection and tidy up */
581 static void close_connection(struct connection *con, bool and_other,
582                              bool tx, bool rx)
583 {
584         if (tx && cancel_work_sync(&con->swork))
585                 log_print("canceled swork for node %d", con->nodeid);
586         if (rx && cancel_work_sync(&con->rwork))
587                 log_print("canceled rwork for node %d", con->nodeid);
588
589         mutex_lock(&con->sock_mutex);
590         if (con->sock) {
591                 restore_callbacks(con->sock);
592                 sock_release(con->sock);
593                 con->sock = NULL;
594         }
595         if (con->othercon && and_other) {
596                 /* Will only re-enter once. */
597                 close_connection(con->othercon, false, true, true);
598         }
599         if (con->rx_page) {
600                 __free_page(con->rx_page);
601                 con->rx_page = NULL;
602         }
603
604         con->retries = 0;
605         mutex_unlock(&con->sock_mutex);
606 }
607
608 /* Data received from remote end */
609 static int receive_from_sock(struct connection *con)
610 {
611         int ret = 0;
612         struct msghdr msg = {};
613         struct kvec iov[2];
614         unsigned len;
615         int r;
616         int call_again_soon = 0;
617         int nvec;
618
619         mutex_lock(&con->sock_mutex);
620
621         if (con->sock == NULL) {
622                 ret = -EAGAIN;
623                 goto out_close;
624         }
625         if (con->nodeid == 0) {
626                 ret = -EINVAL;
627                 goto out_close;
628         }
629
630         if (con->rx_page == NULL) {
631                 /*
632                  * This doesn't need to be atomic, but I think it should
633                  * improve performance if it is.
634                  */
635                 con->rx_page = alloc_page(GFP_ATOMIC);
636                 if (con->rx_page == NULL)
637                         goto out_resched;
638                 cbuf_init(&con->cb, PAGE_SIZE);
639         }
640
641         /*
642          * iov[0] is the bit of the circular buffer between the current end
643          * point (cb.base + cb.len) and the end of the buffer.
644          */
645         iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
646         iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
647         iov[1].iov_len = 0;
648         nvec = 1;
649
650         /*
651          * iov[1] is the bit of the circular buffer between the start of the
652          * buffer and the start of the currently used section (cb.base)
653          */
654         if (cbuf_data(&con->cb) >= con->cb.base) {
655                 iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb);
656                 iov[1].iov_len = con->cb.base;
657                 iov[1].iov_base = page_address(con->rx_page);
658                 nvec = 2;
659         }
660         len = iov[0].iov_len + iov[1].iov_len;
661
662         r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
663                                MSG_DONTWAIT | MSG_NOSIGNAL);
664         if (ret <= 0)
665                 goto out_close;
666         else if (ret == len)
667                 call_again_soon = 1;
668
669         cbuf_add(&con->cb, ret);
670         ret = dlm_process_incoming_buffer(con->nodeid,
671                                           page_address(con->rx_page),
672                                           con->cb.base, con->cb.len,
673                                           PAGE_SIZE);
674         if (ret == -EBADMSG) {
675                 log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
676                           page_address(con->rx_page), con->cb.base,
677                           con->cb.len, r);
678         }
679         if (ret < 0)
680                 goto out_close;
681         cbuf_eat(&con->cb, ret);
682
683         if (cbuf_empty(&con->cb) && !call_again_soon) {
684                 __free_page(con->rx_page);
685                 con->rx_page = NULL;
686         }
687
688         if (call_again_soon)
689                 goto out_resched;
690         mutex_unlock(&con->sock_mutex);
691         return 0;
692
693 out_resched:
694         if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
695                 queue_work(recv_workqueue, &con->rwork);
696         mutex_unlock(&con->sock_mutex);
697         return -EAGAIN;
698
699 out_close:
700         mutex_unlock(&con->sock_mutex);
701         if (ret != -EAGAIN) {
702                 close_connection(con, false, true, false);
703                 /* Reconnect when there is something to send */
704         }
705         /* Don't return success if we really got EOF */
706         if (ret == 0)
707                 ret = -EAGAIN;
708
709         return ret;
710 }
711
712 /* Listening socket is busy, accept a connection */
713 static int tcp_accept_from_sock(struct connection *con)
714 {
715         int result;
716         struct sockaddr_storage peeraddr;
717         struct socket *newsock;
718         int len;
719         int nodeid;
720         struct connection *newcon;
721         struct connection *addcon;
722
723         mutex_lock(&connections_lock);
724         if (!dlm_allow_conn) {
725                 mutex_unlock(&connections_lock);
726                 return -1;
727         }
728         mutex_unlock(&connections_lock);
729
730         memset(&peeraddr, 0, sizeof(peeraddr));
731         result = sock_create_lite(dlm_local_addr[0]->ss_family,
732                                   SOCK_STREAM, IPPROTO_TCP, &newsock);
733         if (result < 0)
734                 return -ENOMEM;
735
736         mutex_lock_nested(&con->sock_mutex, 0);
737
738         result = -ENOTCONN;
739         if (con->sock == NULL)
740                 goto accept_err;
741
742         newsock->type = con->sock->type;
743         newsock->ops = con->sock->ops;
744
745         result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK, true);
746         if (result < 0)
747                 goto accept_err;
748
749         /* Get the connected socket's peer */
750         memset(&peeraddr, 0, sizeof(peeraddr));
751         if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
752                                   &len, 2)) {
753                 result = -ECONNABORTED;
754                 goto accept_err;
755         }
756
757         /* Get the new node's NODEID */
758         make_sockaddr(&peeraddr, 0, &len);
759         if (addr_to_nodeid(&peeraddr, &nodeid)) {
760                 unsigned char *b=(unsigned char *)&peeraddr;
761                 log_print("connect from non cluster node");
762                 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
763                                      b, sizeof(struct sockaddr_storage));
764                 sock_release(newsock);
765                 mutex_unlock(&con->sock_mutex);
766                 return -1;
767         }
768
769         log_print("got connection from %d", nodeid);
770
771         /*  Check to see if we already have a connection to this node. This
772          *  could happen if the two nodes initiate a connection at roughly
773          *  the same time and the connections cross on the wire.
774          *  In this case we store the incoming one in "othercon"
775          */
776         newcon = nodeid2con(nodeid, GFP_NOFS);
777         if (!newcon) {
778                 result = -ENOMEM;
779                 goto accept_err;
780         }
781         mutex_lock_nested(&newcon->sock_mutex, 1);
782         if (newcon->sock) {
783                 struct connection *othercon = newcon->othercon;
784
785                 if (!othercon) {
786                         othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
787                         if (!othercon) {
788                                 log_print("failed to allocate incoming socket");
789                                 mutex_unlock(&newcon->sock_mutex);
790                                 result = -ENOMEM;
791                                 goto accept_err;
792                         }
793                         othercon->nodeid = nodeid;
794                         othercon->rx_action = receive_from_sock;
795                         mutex_init(&othercon->sock_mutex);
796                         INIT_WORK(&othercon->swork, process_send_sockets);
797                         INIT_WORK(&othercon->rwork, process_recv_sockets);
798                         set_bit(CF_IS_OTHERCON, &othercon->flags);
799                 }
800                 if (!othercon->sock) {
801                         newcon->othercon = othercon;
802                         othercon->sock = newsock;
803                         newsock->sk->sk_user_data = othercon;
804                         add_sock(newsock, othercon);
805                         addcon = othercon;
806                 }
807                 else {
808                         printk("Extra connection from node %d attempted\n", nodeid);
809                         result = -EAGAIN;
810                         mutex_unlock(&newcon->sock_mutex);
811                         goto accept_err;
812                 }
813         }
814         else {
815                 newsock->sk->sk_user_data = newcon;
816                 newcon->rx_action = receive_from_sock;
817                 /* accept copies the sk after we've saved the callbacks, so we
818                    don't want to save them a second time or comm errors will
819                    result in calling sk_error_report recursively. */
820                 add_sock(newsock, newcon);
821                 addcon = newcon;
822         }
823
824         mutex_unlock(&newcon->sock_mutex);
825
826         /*
827          * Add it to the active queue in case we got data
828          * between processing the accept adding the socket
829          * to the read_sockets list
830          */
831         if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
832                 queue_work(recv_workqueue, &addcon->rwork);
833         mutex_unlock(&con->sock_mutex);
834
835         return 0;
836
837 accept_err:
838         mutex_unlock(&con->sock_mutex);
839         sock_release(newsock);
840
841         if (result != -EAGAIN)
842                 log_print("error accepting connection from node: %d", result);
843         return result;
844 }
845
846 static int sctp_accept_from_sock(struct connection *con)
847 {
848         /* Check that the new node is in the lockspace */
849         struct sctp_prim prim;
850         int nodeid;
851         int prim_len, ret;
852         int addr_len;
853         struct connection *newcon;
854         struct connection *addcon;
855         struct socket *newsock;
856
857         mutex_lock(&connections_lock);
858         if (!dlm_allow_conn) {
859                 mutex_unlock(&connections_lock);
860                 return -1;
861         }
862         mutex_unlock(&connections_lock);
863
864         mutex_lock_nested(&con->sock_mutex, 0);
865
866         ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
867         if (ret < 0)
868                 goto accept_err;
869
870         memset(&prim, 0, sizeof(struct sctp_prim));
871         prim_len = sizeof(struct sctp_prim);
872
873         ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
874                                 (char *)&prim, &prim_len);
875         if (ret < 0) {
876                 log_print("getsockopt/sctp_primary_addr failed: %d", ret);
877                 goto accept_err;
878         }
879
880         make_sockaddr(&prim.ssp_addr, 0, &addr_len);
881         ret = addr_to_nodeid(&prim.ssp_addr, &nodeid);
882         if (ret) {
883                 unsigned char *b = (unsigned char *)&prim.ssp_addr;
884
885                 log_print("reject connect from unknown addr");
886                 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
887                                      b, sizeof(struct sockaddr_storage));
888                 goto accept_err;
889         }
890
891         newcon = nodeid2con(nodeid, GFP_NOFS);
892         if (!newcon) {
893                 ret = -ENOMEM;
894                 goto accept_err;
895         }
896
897         mutex_lock_nested(&newcon->sock_mutex, 1);
898
899         if (newcon->sock) {
900                 struct connection *othercon = newcon->othercon;
901
902                 if (!othercon) {
903                         othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
904                         if (!othercon) {
905                                 log_print("failed to allocate incoming socket");
906                                 mutex_unlock(&newcon->sock_mutex);
907                                 ret = -ENOMEM;
908                                 goto accept_err;
909                         }
910                         othercon->nodeid = nodeid;
911                         othercon->rx_action = receive_from_sock;
912                         mutex_init(&othercon->sock_mutex);
913                         INIT_WORK(&othercon->swork, process_send_sockets);
914                         INIT_WORK(&othercon->rwork, process_recv_sockets);
915                         set_bit(CF_IS_OTHERCON, &othercon->flags);
916                 }
917                 if (!othercon->sock) {
918                         newcon->othercon = othercon;
919                         othercon->sock = newsock;
920                         newsock->sk->sk_user_data = othercon;
921                         add_sock(newsock, othercon);
922                         addcon = othercon;
923                 } else {
924                         printk("Extra connection from node %d attempted\n", nodeid);
925                         ret = -EAGAIN;
926                         mutex_unlock(&newcon->sock_mutex);
927                         goto accept_err;
928                 }
929         } else {
930                 newsock->sk->sk_user_data = newcon;
931                 newcon->rx_action = receive_from_sock;
932                 add_sock(newsock, newcon);
933                 addcon = newcon;
934         }
935
936         log_print("connected to %d", nodeid);
937
938         mutex_unlock(&newcon->sock_mutex);
939
940         /*
941          * Add it to the active queue in case we got data
942          * between processing the accept adding the socket
943          * to the read_sockets list
944          */
945         if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
946                 queue_work(recv_workqueue, &addcon->rwork);
947         mutex_unlock(&con->sock_mutex);
948
949         return 0;
950
951 accept_err:
952         mutex_unlock(&con->sock_mutex);
953         if (newsock)
954                 sock_release(newsock);
955         if (ret != -EAGAIN)
956                 log_print("error accepting connection from node: %d", ret);
957
958         return ret;
959 }
960
961 static void free_entry(struct writequeue_entry *e)
962 {
963         __free_page(e->page);
964         kfree(e);
965 }
966
967 /*
968  * writequeue_entry_complete - try to delete and free write queue entry
969  * @e: write queue entry to try to delete
970  * @completed: bytes completed
971  *
972  * writequeue_lock must be held.
973  */
974 static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
975 {
976         e->offset += completed;
977         e->len -= completed;
978
979         if (e->len == 0 && e->users == 0) {
980                 list_del(&e->list);
981                 free_entry(e);
982         }
983 }
984
985 /*
986  * sctp_bind_addrs - bind a SCTP socket to all our addresses
987  */
988 static int sctp_bind_addrs(struct connection *con, uint16_t port)
989 {
990         struct sockaddr_storage localaddr;
991         int i, addr_len, result = 0;
992
993         for (i = 0; i < dlm_local_count; i++) {
994                 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
995                 make_sockaddr(&localaddr, port, &addr_len);
996
997                 if (!i)
998                         result = kernel_bind(con->sock,
999                                              (struct sockaddr *)&localaddr,
1000                                              addr_len);
1001                 else
1002                         result = kernel_setsockopt(con->sock, SOL_SCTP,
1003                                                    SCTP_SOCKOPT_BINDX_ADD,
1004                                                    (char *)&localaddr, addr_len);
1005
1006                 if (result < 0) {
1007                         log_print("Can't bind to %d addr number %d, %d.\n",
1008                                   port, i + 1, result);
1009                         break;
1010                 }
1011         }
1012         return result;
1013 }
1014
1015 /* Initiate an SCTP association.
1016    This is a special case of send_to_sock() in that we don't yet have a
1017    peeled-off socket for this association, so we use the listening socket
1018    and add the primary IP address of the remote node.
1019  */
1020 static void sctp_connect_to_sock(struct connection *con)
1021 {
1022         struct sockaddr_storage daddr;
1023         int one = 1;
1024         int result;
1025         int addr_len;
1026         struct socket *sock;
1027
1028         if (con->nodeid == 0) {
1029                 log_print("attempt to connect sock 0 foiled");
1030                 return;
1031         }
1032
1033         mutex_lock(&con->sock_mutex);
1034
1035         /* Some odd races can cause double-connects, ignore them */
1036         if (con->retries++ > MAX_CONNECT_RETRIES)
1037                 goto out;
1038
1039         if (con->sock) {
1040                 log_print("node %d already connected.", con->nodeid);
1041                 goto out;
1042         }
1043
1044         memset(&daddr, 0, sizeof(daddr));
1045         result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
1046         if (result < 0) {
1047                 log_print("no address for nodeid %d", con->nodeid);
1048                 goto out;
1049         }
1050
1051         /* Create a socket to communicate with */
1052         result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1053                                   SOCK_STREAM, IPPROTO_SCTP, &sock);
1054         if (result < 0)
1055                 goto socket_err;
1056
1057         sock->sk->sk_user_data = con;
1058         con->rx_action = receive_from_sock;
1059         con->connect_action = sctp_connect_to_sock;
1060         add_sock(sock, con);
1061
1062         /* Bind to all addresses. */
1063         if (sctp_bind_addrs(con, 0))
1064                 goto bind_err;
1065
1066         make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
1067
1068         log_print("connecting to %d", con->nodeid);
1069
1070         /* Turn off Nagle's algorithm */
1071         kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1072                           sizeof(one));
1073
1074         result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
1075                                    O_NONBLOCK);
1076         if (result == -EINPROGRESS)
1077                 result = 0;
1078         if (result == 0)
1079                 goto out;
1080
1081 bind_err:
1082         con->sock = NULL;
1083         sock_release(sock);
1084
1085 socket_err:
1086         /*
1087          * Some errors are fatal and this list might need adjusting. For other
1088          * errors we try again until the max number of retries is reached.
1089          */
1090         if (result != -EHOSTUNREACH &&
1091             result != -ENETUNREACH &&
1092             result != -ENETDOWN &&
1093             result != -EINVAL &&
1094             result != -EPROTONOSUPPORT) {
1095                 log_print("connect %d try %d error %d", con->nodeid,
1096                           con->retries, result);
1097                 mutex_unlock(&con->sock_mutex);
1098                 msleep(1000);
1099                 lowcomms_connect_sock(con);
1100                 return;
1101         }
1102
1103 out:
1104         mutex_unlock(&con->sock_mutex);
1105 }
1106
1107 /* Connect a new socket to its peer */
1108 static void tcp_connect_to_sock(struct connection *con)
1109 {
1110         struct sockaddr_storage saddr, src_addr;
1111         int addr_len;
1112         struct socket *sock = NULL;
1113         int one = 1;
1114         int result;
1115
1116         if (con->nodeid == 0) {
1117                 log_print("attempt to connect sock 0 foiled");
1118                 return;
1119         }
1120
1121         mutex_lock(&con->sock_mutex);
1122         if (con->retries++ > MAX_CONNECT_RETRIES)
1123                 goto out;
1124
1125         /* Some odd races can cause double-connects, ignore them */
1126         if (con->sock)
1127                 goto out;
1128
1129         /* Create a socket to communicate with */
1130         result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1131                                   SOCK_STREAM, IPPROTO_TCP, &sock);
1132         if (result < 0)
1133                 goto out_err;
1134
1135         memset(&saddr, 0, sizeof(saddr));
1136         result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1137         if (result < 0) {
1138                 log_print("no address for nodeid %d", con->nodeid);
1139                 goto out_err;
1140         }
1141
1142         sock->sk->sk_user_data = con;
1143         con->rx_action = receive_from_sock;
1144         con->connect_action = tcp_connect_to_sock;
1145         add_sock(sock, con);
1146
1147         /* Bind to our cluster-known address connecting to avoid
1148            routing problems */
1149         memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1150         make_sockaddr(&src_addr, 0, &addr_len);
1151         result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1152                                  addr_len);
1153         if (result < 0) {
1154                 log_print("could not bind for connect: %d", result);
1155                 /* This *may* not indicate a critical error */
1156         }
1157
1158         make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1159
1160         log_print("connecting to %d", con->nodeid);
1161
1162         /* Turn off Nagle's algorithm */
1163         kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1164                           sizeof(one));
1165
1166         result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1167                                    O_NONBLOCK);
1168         if (result == -EINPROGRESS)
1169                 result = 0;
1170         if (result == 0)
1171                 goto out;
1172
1173 out_err:
1174         if (con->sock) {
1175                 sock_release(con->sock);
1176                 con->sock = NULL;
1177         } else if (sock) {
1178                 sock_release(sock);
1179         }
1180         /*
1181          * Some errors are fatal and this list might need adjusting. For other
1182          * errors we try again until the max number of retries is reached.
1183          */
1184         if (result != -EHOSTUNREACH &&
1185             result != -ENETUNREACH &&
1186             result != -ENETDOWN && 
1187             result != -EINVAL &&
1188             result != -EPROTONOSUPPORT) {
1189                 log_print("connect %d try %d error %d", con->nodeid,
1190                           con->retries, result);
1191                 mutex_unlock(&con->sock_mutex);
1192                 msleep(1000);
1193                 lowcomms_connect_sock(con);
1194                 return;
1195         }
1196 out:
1197         mutex_unlock(&con->sock_mutex);
1198         return;
1199 }
1200
1201 static struct socket *tcp_create_listen_sock(struct connection *con,
1202                                              struct sockaddr_storage *saddr)
1203 {
1204         struct socket *sock = NULL;
1205         int result = 0;
1206         int one = 1;
1207         int addr_len;
1208
1209         if (dlm_local_addr[0]->ss_family == AF_INET)
1210                 addr_len = sizeof(struct sockaddr_in);
1211         else
1212                 addr_len = sizeof(struct sockaddr_in6);
1213
1214         /* Create a socket to communicate with */
1215         result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1216                                   SOCK_STREAM, IPPROTO_TCP, &sock);
1217         if (result < 0) {
1218                 log_print("Can't create listening comms socket");
1219                 goto create_out;
1220         }
1221
1222         /* Turn off Nagle's algorithm */
1223         kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1224                           sizeof(one));
1225
1226         result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1227                                    (char *)&one, sizeof(one));
1228
1229         if (result < 0) {
1230                 log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1231         }
1232         sock->sk->sk_user_data = con;
1233         save_listen_callbacks(sock);
1234         con->rx_action = tcp_accept_from_sock;
1235         con->connect_action = tcp_connect_to_sock;
1236
1237         /* Bind to our port */
1238         make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1239         result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1240         if (result < 0) {
1241                 log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1242                 sock_release(sock);
1243                 sock = NULL;
1244                 con->sock = NULL;
1245                 goto create_out;
1246         }
1247         result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1248                                  (char *)&one, sizeof(one));
1249         if (result < 0) {
1250                 log_print("Set keepalive failed: %d", result);
1251         }
1252
1253         result = sock->ops->listen(sock, 5);
1254         if (result < 0) {
1255                 log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1256                 sock_release(sock);
1257                 sock = NULL;
1258                 goto create_out;
1259         }
1260
1261 create_out:
1262         return sock;
1263 }
1264
1265 /* Get local addresses */
1266 static void init_local(void)
1267 {
1268         struct sockaddr_storage sas, *addr;
1269         int i;
1270
1271         dlm_local_count = 0;
1272         for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1273                 if (dlm_our_addr(&sas, i))
1274                         break;
1275
1276                 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
1277                 if (!addr)
1278                         break;
1279                 dlm_local_addr[dlm_local_count++] = addr;
1280         }
1281 }
1282
1283 /* Initialise SCTP socket and bind to all interfaces */
1284 static int sctp_listen_for_all(void)
1285 {
1286         struct socket *sock = NULL;
1287         int result = -EINVAL;
1288         struct connection *con = nodeid2con(0, GFP_NOFS);
1289         int bufsize = NEEDED_RMEM;
1290         int one = 1;
1291
1292         if (!con)
1293                 return -ENOMEM;
1294
1295         log_print("Using SCTP for communications");
1296
1297         result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1298                                   SOCK_STREAM, IPPROTO_SCTP, &sock);
1299         if (result < 0) {
1300                 log_print("Can't create comms socket, check SCTP is loaded");
1301                 goto out;
1302         }
1303
1304         result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1305                                  (char *)&bufsize, sizeof(bufsize));
1306         if (result)
1307                 log_print("Error increasing buffer space on socket %d", result);
1308
1309         result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1310                                    sizeof(one));
1311         if (result < 0)
1312                 log_print("Could not set SCTP NODELAY error %d\n", result);
1313
1314         write_lock_bh(&sock->sk->sk_callback_lock);
1315         /* Init con struct */
1316         sock->sk->sk_user_data = con;
1317         save_listen_callbacks(sock);
1318         con->sock = sock;
1319         con->sock->sk->sk_data_ready = lowcomms_data_ready;
1320         con->rx_action = sctp_accept_from_sock;
1321         con->connect_action = sctp_connect_to_sock;
1322
1323         write_unlock_bh(&sock->sk->sk_callback_lock);
1324
1325         /* Bind to all addresses. */
1326         if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1327                 goto create_delsock;
1328
1329         result = sock->ops->listen(sock, 5);
1330         if (result < 0) {
1331                 log_print("Can't set socket listening");
1332                 goto create_delsock;
1333         }
1334
1335         return 0;
1336
1337 create_delsock:
1338         sock_release(sock);
1339         con->sock = NULL;
1340 out:
1341         return result;
1342 }
1343
1344 static int tcp_listen_for_all(void)
1345 {
1346         struct socket *sock = NULL;
1347         struct connection *con = nodeid2con(0, GFP_NOFS);
1348         int result = -EINVAL;
1349
1350         if (!con)
1351                 return -ENOMEM;
1352
1353         /* We don't support multi-homed hosts */
1354         if (dlm_local_addr[1] != NULL) {
1355                 log_print("TCP protocol can't handle multi-homed hosts, "
1356                           "try SCTP");
1357                 return -EINVAL;
1358         }
1359
1360         log_print("Using TCP for communications");
1361
1362         sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1363         if (sock) {
1364                 add_sock(sock, con);
1365                 result = 0;
1366         }
1367         else {
1368                 result = -EADDRINUSE;
1369         }
1370
1371         return result;
1372 }
1373
1374
1375
1376 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1377                                                      gfp_t allocation)
1378 {
1379         struct writequeue_entry *entry;
1380
1381         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1382         if (!entry)
1383                 return NULL;
1384
1385         entry->page = alloc_page(allocation);
1386         if (!entry->page) {
1387                 kfree(entry);
1388                 return NULL;
1389         }
1390
1391         entry->offset = 0;
1392         entry->len = 0;
1393         entry->end = 0;
1394         entry->users = 0;
1395         entry->con = con;
1396
1397         return entry;
1398 }
1399
1400 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1401 {
1402         struct connection *con;
1403         struct writequeue_entry *e;
1404         int offset = 0;
1405
1406         con = nodeid2con(nodeid, allocation);
1407         if (!con)
1408                 return NULL;
1409
1410         spin_lock(&con->writequeue_lock);
1411         e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1412         if ((&e->list == &con->writequeue) ||
1413             (PAGE_SIZE - e->end < len)) {
1414                 e = NULL;
1415         } else {
1416                 offset = e->end;
1417                 e->end += len;
1418                 e->users++;
1419         }
1420         spin_unlock(&con->writequeue_lock);
1421
1422         if (e) {
1423         got_one:
1424                 *ppc = page_address(e->page) + offset;
1425                 return e;
1426         }
1427
1428         e = new_writequeue_entry(con, allocation);
1429         if (e) {
1430                 spin_lock(&con->writequeue_lock);
1431                 offset = e->end;
1432                 e->end += len;
1433                 e->users++;
1434                 list_add_tail(&e->list, &con->writequeue);
1435                 spin_unlock(&con->writequeue_lock);
1436                 goto got_one;
1437         }
1438         return NULL;
1439 }
1440
1441 void dlm_lowcomms_commit_buffer(void *mh)
1442 {
1443         struct writequeue_entry *e = (struct writequeue_entry *)mh;
1444         struct connection *con = e->con;
1445         int users;
1446
1447         spin_lock(&con->writequeue_lock);
1448         users = --e->users;
1449         if (users)
1450                 goto out;
1451         e->len = e->end - e->offset;
1452         spin_unlock(&con->writequeue_lock);
1453
1454         queue_work(send_workqueue, &con->swork);
1455         return;
1456
1457 out:
1458         spin_unlock(&con->writequeue_lock);
1459         return;
1460 }
1461
1462 /* Send a message */
1463 static void send_to_sock(struct connection *con)
1464 {
1465         int ret = 0;
1466         const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1467         struct writequeue_entry *e;
1468         int len, offset;
1469         int count = 0;
1470
1471         mutex_lock(&con->sock_mutex);
1472         if (con->sock == NULL)
1473                 goto out_connect;
1474
1475         spin_lock(&con->writequeue_lock);
1476         for (;;) {
1477                 e = list_entry(con->writequeue.next, struct writequeue_entry,
1478                                list);
1479                 if ((struct list_head *) e == &con->writequeue)
1480                         break;
1481
1482                 len = e->len;
1483                 offset = e->offset;
1484                 BUG_ON(len == 0 && e->users == 0);
1485                 spin_unlock(&con->writequeue_lock);
1486
1487                 ret = 0;
1488                 if (len) {
1489                         ret = kernel_sendpage(con->sock, e->page, offset, len,
1490                                               msg_flags);
1491                         if (ret == -EAGAIN || ret == 0) {
1492                                 if (ret == -EAGAIN &&
1493                                     test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1494                                     !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1495                                         /* Notify TCP that we're limited by the
1496                                          * application window size.
1497                                          */
1498                                         set_bit(SOCK_NOSPACE, &con->sock->flags);
1499                                         con->sock->sk->sk_write_pending++;
1500                                 }
1501                                 cond_resched();
1502                                 goto out;
1503                         } else if (ret < 0)
1504                                 goto send_error;
1505                 }
1506
1507                 /* Don't starve people filling buffers */
1508                 if (++count >= MAX_SEND_MSG_COUNT) {
1509                         cond_resched();
1510                         count = 0;
1511                 }
1512
1513                 spin_lock(&con->writequeue_lock);
1514                 writequeue_entry_complete(e, ret);
1515         }
1516         spin_unlock(&con->writequeue_lock);
1517 out:
1518         mutex_unlock(&con->sock_mutex);
1519         return;
1520
1521 send_error:
1522         mutex_unlock(&con->sock_mutex);
1523         close_connection(con, false, false, true);
1524         /* Requeue the send work. When the work daemon runs again, it will try
1525            a new connection, then call this function again. */
1526         queue_work(send_workqueue, &con->swork);
1527         return;
1528
1529 out_connect:
1530         mutex_unlock(&con->sock_mutex);
1531         queue_work(send_workqueue, &con->swork);
1532         cond_resched();
1533 }
1534
1535 static void clean_one_writequeue(struct connection *con)
1536 {
1537         struct writequeue_entry *e, *safe;
1538
1539         spin_lock(&con->writequeue_lock);
1540         list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1541                 list_del(&e->list);
1542                 free_entry(e);
1543         }
1544         spin_unlock(&con->writequeue_lock);
1545 }
1546
1547 /* Called from recovery when it knows that a node has
1548    left the cluster */
1549 int dlm_lowcomms_close(int nodeid)
1550 {
1551         struct connection *con;
1552         struct dlm_node_addr *na;
1553
1554         log_print("closing connection to node %d", nodeid);
1555         con = nodeid2con(nodeid, 0);
1556         if (con) {
1557                 set_bit(CF_CLOSE, &con->flags);
1558                 close_connection(con, true, true, true);
1559                 clean_one_writequeue(con);
1560         }
1561
1562         spin_lock(&dlm_node_addrs_spin);
1563         na = find_node_addr(nodeid);
1564         if (na) {
1565                 list_del(&na->list);
1566                 while (na->addr_count--)
1567                         kfree(na->addr[na->addr_count]);
1568                 kfree(na);
1569         }
1570         spin_unlock(&dlm_node_addrs_spin);
1571
1572         return 0;
1573 }
1574
1575 /* Receive workqueue function */
1576 static void process_recv_sockets(struct work_struct *work)
1577 {
1578         struct connection *con = container_of(work, struct connection, rwork);
1579         int err;
1580
1581         clear_bit(CF_READ_PENDING, &con->flags);
1582         do {
1583                 err = con->rx_action(con);
1584         } while (!err);
1585 }
1586
1587 /* Send workqueue function */
1588 static void process_send_sockets(struct work_struct *work)
1589 {
1590         struct connection *con = container_of(work, struct connection, swork);
1591
1592         if (con->sock == NULL) /* not mutex protected so check it inside too */
1593                 con->connect_action(con);
1594         if (!list_empty(&con->writequeue))
1595                 send_to_sock(con);
1596 }
1597
1598
1599 /* Discard all entries on the write queues */
1600 static void clean_writequeues(void)
1601 {
1602         foreach_conn(clean_one_writequeue);
1603 }
1604
1605 static void work_stop(void)
1606 {
1607         destroy_workqueue(recv_workqueue);
1608         destroy_workqueue(send_workqueue);
1609 }
1610
1611 static int work_start(void)
1612 {
1613         recv_workqueue = alloc_workqueue("dlm_recv",
1614                                          WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1615         if (!recv_workqueue) {
1616                 log_print("can't start dlm_recv");
1617                 return -ENOMEM;
1618         }
1619
1620         send_workqueue = alloc_workqueue("dlm_send",
1621                                          WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1622         if (!send_workqueue) {
1623                 log_print("can't start dlm_send");
1624                 destroy_workqueue(recv_workqueue);
1625                 return -ENOMEM;
1626         }
1627
1628         return 0;
1629 }
1630
1631 static void _stop_conn(struct connection *con, bool and_other)
1632 {
1633         mutex_lock(&con->sock_mutex);
1634         set_bit(CF_READ_PENDING, &con->flags);
1635         if (con->sock && con->sock->sk)
1636                 con->sock->sk->sk_user_data = NULL;
1637         if (con->othercon && and_other)
1638                 _stop_conn(con->othercon, false);
1639         mutex_unlock(&con->sock_mutex);
1640 }
1641
1642 static void stop_conn(struct connection *con)
1643 {
1644         _stop_conn(con, true);
1645 }
1646
1647 static void free_conn(struct connection *con)
1648 {
1649         close_connection(con, true, true, true);
1650         if (con->othercon)
1651                 kmem_cache_free(con_cache, con->othercon);
1652         hlist_del(&con->list);
1653         kmem_cache_free(con_cache, con);
1654 }
1655
1656 static void work_flush(void)
1657 {
1658         int ok;
1659         int i;
1660         struct hlist_node *n;
1661         struct connection *con;
1662
1663         flush_workqueue(recv_workqueue);
1664         flush_workqueue(send_workqueue);
1665         do {
1666                 ok = 1;
1667                 foreach_conn(stop_conn);
1668                 flush_workqueue(recv_workqueue);
1669                 flush_workqueue(send_workqueue);
1670                 for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
1671                         hlist_for_each_entry_safe(con, n,
1672                                                   &connection_hash[i], list) {
1673                                 ok &= test_bit(CF_READ_PENDING, &con->flags);
1674                                 if (con->othercon)
1675                                         ok &= test_bit(CF_READ_PENDING,
1676                                                        &con->othercon->flags);
1677                         }
1678                 }
1679         } while (!ok);
1680 }
1681
1682 void dlm_lowcomms_stop(void)
1683 {
1684         /* Set all the flags to prevent any
1685            socket activity.
1686         */
1687         mutex_lock(&connections_lock);
1688         dlm_allow_conn = 0;
1689         mutex_unlock(&connections_lock);
1690         work_flush();
1691         clean_writequeues();
1692         foreach_conn(free_conn);
1693         work_stop();
1694
1695         kmem_cache_destroy(con_cache);
1696 }
1697
1698 int dlm_lowcomms_start(void)
1699 {
1700         int error = -EINVAL;
1701         struct connection *con;
1702         int i;
1703
1704         for (i = 0; i < CONN_HASH_SIZE; i++)
1705                 INIT_HLIST_HEAD(&connection_hash[i]);
1706
1707         init_local();
1708         if (!dlm_local_count) {
1709                 error = -ENOTCONN;
1710                 log_print("no local IP address has been set");
1711                 goto fail;
1712         }
1713
1714         error = -ENOMEM;
1715         con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1716                                       __alignof__(struct connection), 0,
1717                                       NULL);
1718         if (!con_cache)
1719                 goto fail;
1720
1721         error = work_start();
1722         if (error)
1723                 goto fail_destroy;
1724
1725         dlm_allow_conn = 1;
1726
1727         /* Start listening */
1728         if (dlm_config.ci_protocol == 0)
1729                 error = tcp_listen_for_all();
1730         else
1731                 error = sctp_listen_for_all();
1732         if (error)
1733                 goto fail_unlisten;
1734
1735         return 0;
1736
1737 fail_unlisten:
1738         dlm_allow_conn = 0;
1739         con = nodeid2con(0,0);
1740         if (con) {
1741                 close_connection(con, false, true, true);
1742                 kmem_cache_free(con_cache, con);
1743         }
1744 fail_destroy:
1745         kmem_cache_destroy(con_cache);
1746 fail:
1747         return error;
1748 }
1749
1750 void dlm_lowcomms_exit(void)
1751 {
1752         struct dlm_node_addr *na, *safe;
1753
1754         spin_lock(&dlm_node_addrs_spin);
1755         list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1756                 list_del(&na->list);
1757                 while (na->addr_count--)
1758                         kfree(na->addr[na->addr_count]);
1759                 kfree(na);
1760         }
1761         spin_unlock(&dlm_node_addrs_spin);
1762 }