return rc;
}
+static void ceph_con_reset_protocol(struct ceph_connection *con)
+{
+ dout("%s con %p\n", __func__, con);
+
+ con_close_socket(con);
+ if (con->in_msg) {
+ WARN_ON(con->in_msg->con != con);
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ }
+ if (con->out_msg) {
+ WARN_ON(con->out_msg->con != con);
+ ceph_msg_put(con->out_msg);
+ con->out_msg = NULL;
+ }
+
+ con->out_skip = 0;
+}
+
/*
* Reset a connection. Discard all incoming and outgoing messages
* and clear *_seq state.
}
}
-static void reset_connection(struct ceph_connection *con)
+static void ceph_con_reset_session(struct ceph_connection *con)
{
- /* reset connection, out_queue, msg_ and connect_seq */
- /* discard existing out_queue and msg_seq */
- dout("reset_connection %p\n", con);
+ dout("%s con %p\n", __func__, con);
+
+ WARN_ON(con->in_msg);
+ WARN_ON(con->out_msg);
ceph_msg_remove_list(&con->out_queue);
ceph_msg_remove_list(&con->out_sent);
-
- if (con->in_msg) {
- BUG_ON(con->in_msg->con != con);
- ceph_msg_put(con->in_msg);
- con->in_msg = NULL;
- }
-
- con->connect_seq = 0;
con->out_seq = 0;
- if (con->out_msg) {
- BUG_ON(con->out_msg->con != con);
- ceph_msg_put(con->out_msg);
- con->out_msg = NULL;
- }
con->in_seq = 0;
con->in_seq_acked = 0;
- con->out_skip = 0;
+ con->connect_seq = 0;
+ con->peer_global_seq = 0;
}
/*
con_flag_clear(con, CON_FLAG_WRITE_PENDING);
con_flag_clear(con, CON_FLAG_BACKOFF);
- reset_connection(con);
- con->peer_global_seq = 0;
+ ceph_con_reset_protocol(con);
+ ceph_con_reset_session(con);
cancel_con(con);
- con_close_socket(con);
mutex_unlock(&con->mutex);
}
EXPORT_SYMBOL(ceph_con_close);
ceph_pr_addr(&con->peer_addr),
sup_feat, server_feat, server_feat & ~sup_feat);
con->error_msg = "missing required protocol features";
- reset_connection(con);
return -1;
case CEPH_MSGR_TAG_BADPROTOVER:
le32_to_cpu(con->out_connect.protocol_version),
le32_to_cpu(con->in_reply.protocol_version));
con->error_msg = "protocol version mismatch";
- reset_connection(con);
return -1;
case CEPH_MSGR_TAG_BADAUTHORIZER:
*/
dout("process_connect got RESET peer seq %u\n",
le32_to_cpu(con->in_reply.connect_seq));
- pr_err("%s%lld %s connection reset\n",
- ENTITY_NAME(con->peer_name),
- ceph_pr_addr(&con->peer_addr));
- reset_connection(con);
+ pr_info("%s%lld %s session reset\n",
+ ENTITY_NAME(con->peer_name),
+ ceph_pr_addr(&con->peer_addr));
+ ceph_con_reset_session(con);
con_out_kvec_reset(con);
ret = prepare_write_connect(con);
if (ret < 0)
/* Tell ceph about it. */
mutex_unlock(&con->mutex);
- pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
if (con->ops->peer_reset)
con->ops->peer_reset(con);
mutex_lock(&con->mutex);
ceph_pr_addr(&con->peer_addr),
req_feat, server_feat, req_feat & ~server_feat);
con->error_msg = "missing required protocol features";
- reset_connection(con);
return -1;
}
break;
dout("got ack for seq %llu type %d at %p\n", seq,
le16_to_cpu(m->hdr.type), m);
- m->ack_stamp = jiffies;
ceph_msg_remove(m);
}
con->in_seq++;
mutex_unlock(&con->mutex);
- dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
+ dout("===== %p %llu from %s%lld %d=%s len %d+%d+%d (%u %u %u) =====\n",
msg, le64_to_cpu(msg->hdr.seq),
ENTITY_NAME(msg->hdr.src),
le16_to_cpu(msg->hdr.type),
ceph_msg_type_name(le16_to_cpu(msg->hdr.type)),
le32_to_cpu(msg->hdr.front_len),
+ le32_to_cpu(msg->hdr.middle_len),
le32_to_cpu(msg->hdr.data_len),
con->in_front_crc, con->in_middle_crc, con->in_data_crc);
con->ops->dispatch(con, msg);
return -ENOENT;
}
+ if (delay >= HZ)
+ delay = round_jiffies_relative(delay);
+
dout("%s %p %lu\n", __func__, con, delay);
if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
dout("%s %p - already queued\n", __func__, con);
if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF))
return false;
- ret = queue_con_delay(con, round_jiffies_relative(con->delay));
+ ret = queue_con_delay(con, con->delay);
if (ret) {
dout("%s: con %p FAILED to back off %lu\n", __func__,
con, con->delay);
con->state != CON_STATE_NEGOTIATING &&
con->state != CON_STATE_OPEN);
- con_close_socket(con);
+ ceph_con_reset_protocol(con);
if (con_flag_test(con, CON_FLAG_LOSSYTX)) {
dout("fault on LOSSYTX channel, marking CLOSED\n");
return;
}
- if (con->in_msg) {
- BUG_ON(con->in_msg->con != con);
- ceph_msg_put(con->in_msg);
- con->in_msg = NULL;
- }
- if (con->out_msg) {
- BUG_ON(con->out_msg->con != con);
- ceph_msg_put(con->out_msg);
- con->out_msg = NULL;
- }
-
/* Requeue anything that hasn't been acked */
list_splice_init(&con->out_sent, &con->out_queue);
} else {
/* retry after a delay. */
con->state = CON_STATE_PREOPEN;
- if (con->delay == 0)
+ if (!con->delay) {
con->delay = BASE_DELAY_INTERVAL;
- else if (con->delay < MAX_DELAY_INTERVAL)
+ } else if (con->delay < MAX_DELAY_INTERVAL) {
con->delay *= 2;
+ if (con->delay > MAX_DELAY_INTERVAL)
+ con->delay = MAX_DELAY_INTERVAL;
+ }
con_flag_set(con, CON_FLAG_BACKOFF);
queue_con(con);
}