struct mutex sock_mutex;
unsigned long flags;
#define CF_READ_PENDING 1
-#define CF_WRITE_PENDING 2
-#define CF_CONNECT_PENDING 3
#define CF_INIT_PENDING 4
#define CF_IS_OTHERCON 5
#define CF_CLOSE 6
struct connection *othercon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
- void (*orig_error_report)(struct sock *);
- void (*orig_data_ready)(struct sock *);
- void (*orig_state_change)(struct sock *);
- void (*orig_write_space)(struct sock *);
};
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
};
+static struct listen_sock_callbacks {
+ void (*sk_error_report)(struct sock *);
+ void (*sk_data_ready)(struct sock *);
+ void (*sk_state_change)(struct sock *);
+ void (*sk_write_space)(struct sock *);
+} listen_sock;
+
static LIST_HEAD(dlm_node_addrs);
static DEFINE_SPINLOCK(dlm_node_addrs_spin);
clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
}
- if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
- queue_work(send_workqueue, &con->swork);
+ queue_work(send_workqueue, &con->swork);
}
static inline void lowcomms_connect_sock(struct connection *con)
{
if (test_bit(CF_CLOSE, &con->flags))
return;
- if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
- queue_work(send_workqueue, &con->swork);
+ queue_work(send_workqueue, &con->swork);
+ cond_resched();
}
static void lowcomms_state_change(struct sock *sk)
if (con == NULL)
goto out;
- orig_report = con->orig_error_report;
+ orig_report = listen_sock.sk_error_report;
if (con->sock == NULL ||
kernel_getpeername(con->sock, (struct sockaddr *)&saddr, &buflen)) {
printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
}
/* Note: sk_callback_lock must be locked before calling this function. */
-static void save_callbacks(struct connection *con, struct sock *sk)
+static void save_listen_callbacks(struct socket *sock)
{
- con->orig_data_ready = sk->sk_data_ready;
- con->orig_state_change = sk->sk_state_change;
- con->orig_write_space = sk->sk_write_space;
- con->orig_error_report = sk->sk_error_report;
+ struct sock *sk = sock->sk;
+
+ listen_sock.sk_data_ready = sk->sk_data_ready;
+ listen_sock.sk_state_change = sk->sk_state_change;
+ listen_sock.sk_write_space = sk->sk_write_space;
+ listen_sock.sk_error_report = sk->sk_error_report;
}
-static void restore_callbacks(struct connection *con, struct sock *sk)
+static void restore_callbacks(struct socket *sock)
{
+ struct sock *sk = sock->sk;
+
write_lock_bh(&sk->sk_callback_lock);
sk->sk_user_data = NULL;
- sk->sk_data_ready = con->orig_data_ready;
- sk->sk_state_change = con->orig_state_change;
- sk->sk_write_space = con->orig_write_space;
- sk->sk_error_report = con->orig_error_report;
+ sk->sk_data_ready = listen_sock.sk_data_ready;
+ sk->sk_state_change = listen_sock.sk_state_change;
+ sk->sk_write_space = listen_sock.sk_write_space;
+ sk->sk_error_report = listen_sock.sk_error_report;
write_unlock_bh(&sk->sk_callback_lock);
}
con->sock = sock;
sk->sk_user_data = con;
- if (save_cb)
- save_callbacks(con, sk);
/* Install a data_ready callback */
sk->sk_data_ready = lowcomms_data_ready;
sk->sk_write_space = lowcomms_write_space;
static void close_connection(struct connection *con, bool and_other,
bool tx, bool rx)
{
- clear_bit(CF_CONNECT_PENDING, &con->flags);
- clear_bit(CF_WRITE_PENDING, &con->flags);
if (tx && cancel_work_sync(&con->swork))
log_print("canceled swork for node %d", con->nodeid);
if (rx && cancel_work_sync(&con->rwork))
mutex_lock(&con->sock_mutex);
if (con->sock) {
- if (!test_bit(CF_IS_OTHERCON, &con->flags))
- restore_callbacks(con, con->sock->sk);
+ restore_callbacks(con->sock);
sock_release(con->sock);
con->sock = NULL;
}
if (result == 0)
goto out;
-
bind_err:
con->sock = NULL;
sock_release(sock);
con->retries, result);
mutex_unlock(&con->sock_mutex);
msleep(1000);
- clear_bit(CF_CONNECT_PENDING, &con->flags);
lowcomms_connect_sock(con);
return;
}
out:
mutex_unlock(&con->sock_mutex);
- set_bit(CF_WRITE_PENDING, &con->flags);
}
/* Connect a new socket to its peer */
con->retries, result);
mutex_unlock(&con->sock_mutex);
msleep(1000);
- clear_bit(CF_CONNECT_PENDING, &con->flags);
lowcomms_connect_sock(con);
return;
}
out:
mutex_unlock(&con->sock_mutex);
- set_bit(CF_WRITE_PENDING, &con->flags);
return;
}
log_print("Failed to set SO_REUSEADDR on socket: %d", result);
}
sock->sk->sk_user_data = con;
-
+ save_listen_callbacks(sock);
con->rx_action = tcp_accept_from_sock;
con->connect_action = tcp_connect_to_sock;
write_lock_bh(&sock->sk->sk_callback_lock);
/* Init con struct */
sock->sk->sk_user_data = con;
+ save_listen_callbacks(sock);
con->sock = sock;
con->sock->sk->sk_data_ready = lowcomms_data_ready;
con->rx_action = sctp_accept_from_sock;
e->len = e->end - e->offset;
spin_unlock(&con->writequeue_lock);
- if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
- queue_work(send_workqueue, &con->swork);
- }
+ queue_work(send_workqueue, &con->swork);
return;
out:
send_error:
mutex_unlock(&con->sock_mutex);
close_connection(con, false, false, true);
- lowcomms_connect_sock(con);
+ /* Requeue the send work. When the work daemon runs again, it will try
+ a new connection, then call this function again. */
+ queue_work(send_workqueue, &con->swork);
return;
out_connect:
mutex_unlock(&con->sock_mutex);
- lowcomms_connect_sock(con);
+ queue_work(send_workqueue, &con->swork);
+ cond_resched();
}
static void clean_one_writequeue(struct connection *con)
{
struct connection *con = container_of(work, struct connection, swork);
- if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags))
+ if (con->sock == NULL) /* not mutex protected so check it inside too */
con->connect_action(con);
- if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
+ if (!list_empty(&con->writequeue))
send_to_sock(con);
}