Merge remote-tracking branches 'regulator/fix/anatop', 'regulator/fix/gpio', 'regulat...
[linux-2.6-microblaze.git] / net / ceph / messenger.c
index e3be1d2..b9b0e3b 100644 (file)
@@ -163,6 +163,7 @@ static struct kmem_cache    *ceph_msg_data_cache;
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
+static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
 
 #ifdef CONFIG_LOCKDEP
 static struct lock_class_key socket_class;
@@ -176,7 +177,7 @@ static struct lock_class_key socket_class;
 
 static void queue_con(struct ceph_connection *con);
 static void cancel_con(struct ceph_connection *con);
-static void con_work(struct work_struct *);
+static void ceph_con_workfn(struct work_struct *);
 static void con_fault(struct ceph_connection *con);
 
 /*
@@ -276,22 +277,22 @@ static void _ceph_msgr_exit(void)
                ceph_msgr_wq = NULL;
        }
 
-       ceph_msgr_slab_exit();
-
        BUG_ON(zero_page == NULL);
        page_cache_release(zero_page);
        zero_page = NULL;
+
+       ceph_msgr_slab_exit();
 }
 
 int ceph_msgr_init(void)
 {
+       if (ceph_msgr_slab_init())
+               return -ENOMEM;
+
        BUG_ON(zero_page != NULL);
        zero_page = ZERO_PAGE(0);
        page_cache_get(zero_page);
 
-       if (ceph_msgr_slab_init())
-               return -ENOMEM;
-
        /*
         * The number of active work items is limited by the number of
         * connections, so leave @max_active at default.
@@ -749,7 +750,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
        mutex_init(&con->mutex);
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
-       INIT_DELAYED_WORK(&con->work, con_work);
+       INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
 
        con->state = CON_STATE_CLOSED;
 }
@@ -1351,7 +1352,16 @@ static void prepare_write_keepalive(struct ceph_connection *con)
 {
        dout("prepare_write_keepalive %p\n", con);
        con_out_kvec_reset(con);
-       con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
+       if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
+               struct timespec now = CURRENT_TIME;
+
+               con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
+               ceph_encode_timespec(&con->out_temp_keepalive2, &now);
+               con_out_kvec_add(con, sizeof(con->out_temp_keepalive2),
+                                &con->out_temp_keepalive2);
+       } else {
+               con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
+       }
        con_flag_set(con, CON_FLAG_WRITE_PENDING);
 }
 
@@ -1625,6 +1635,12 @@ static void prepare_read_tag(struct ceph_connection *con)
        con->in_tag = CEPH_MSGR_TAG_READY;
 }
 
+static void prepare_read_keepalive_ack(struct ceph_connection *con)
+{
+       dout("prepare_read_keepalive_ack %p\n", con);
+       con->in_base_pos = 0;
+}
+
 /*
  * Prepare to read a message.
  */
@@ -2322,13 +2338,6 @@ static int read_partial_message(struct ceph_connection *con)
                        return ret;
 
                BUG_ON(!con->in_msg ^ skip);
-               if (con->in_msg && data_len > con->in_msg->data_length) {
-                       pr_warn("%s skipping long message (%u > %zd)\n",
-                               __func__, data_len, con->in_msg->data_length);
-                       ceph_msg_put(con->in_msg);
-                       con->in_msg = NULL;
-                       skip = 1;
-               }
                if (skip) {
                        /* skip this message */
                        dout("alloc_msg said skip message\n");
@@ -2457,6 +2466,17 @@ static void process_message(struct ceph_connection *con)
        mutex_lock(&con->mutex);
 }
 
+static int read_keepalive_ack(struct ceph_connection *con)
+{
+       struct ceph_timespec ceph_ts;
+       size_t size = sizeof(ceph_ts);
+       int ret = read_partial(con, size, size, &ceph_ts);
+       if (ret <= 0)
+               return ret;
+       ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts);
+       prepare_read_tag(con);
+       return 1;
+}
 
 /*
  * Write something to the socket.  Called in a worker thread when the
@@ -2526,6 +2546,10 @@ more_kvec:
 
 do_next:
        if (con->state == CON_STATE_OPEN) {
+               if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
+                       prepare_write_keepalive(con);
+                       goto more;
+               }
                /* is anything else pending? */
                if (!list_empty(&con->out_queue)) {
                        prepare_write_message(con);
@@ -2535,10 +2559,6 @@ do_next:
                        prepare_write_ack(con);
                        goto more;
                }
-               if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
-                       prepare_write_keepalive(con);
-                       goto more;
-               }
        }
 
        /* Nothing to do! */
@@ -2641,6 +2661,9 @@ more:
                case CEPH_MSGR_TAG_ACK:
                        prepare_read_ack(con);
                        break;
+               case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+                       prepare_read_keepalive_ack(con);
+                       break;
                case CEPH_MSGR_TAG_CLOSE:
                        con_close_socket(con);
                        con->state = CON_STATE_CLOSED;
@@ -2684,6 +2707,12 @@ more:
                process_ack(con);
                goto more;
        }
+       if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
+               ret = read_keepalive_ack(con);
+               if (ret <= 0)
+                       goto out;
+               goto more;
+       }
 
 out:
        dout("try_read done on %p ret %d\n", con, ret);
@@ -2799,7 +2828,7 @@ static void con_fault_finish(struct ceph_connection *con)
 /*
  * Do some work on a connection.  Drop a connection ref when we're done.
  */
-static void con_work(struct work_struct *work)
+static void ceph_con_workfn(struct work_struct *work)
 {
        struct ceph_connection *con = container_of(work, struct ceph_connection,
                                                   work.work);
@@ -3101,6 +3130,20 @@ void ceph_con_keepalive(struct ceph_connection *con)
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
 
+bool ceph_con_keepalive_expired(struct ceph_connection *con,
+                              unsigned long interval)
+{
+       if (interval > 0 &&
+           (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+               struct timespec now = CURRENT_TIME;
+               struct timespec ts;
+               jiffies_to_timespec(interval, &ts);
+               ts = timespec_add(con->last_keepalive_ack, ts);
+               return timespec_compare(&now, &ts) >= 0;
+       }
+       return false;
+}
+
 static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
 {
        struct ceph_msg_data *data;