rxrpc: Rewrite the data and ack handling code
[linux-2.6-microblaze.git] / net / rxrpc / call_object.c
index d233adc..18ab13f 100644 (file)
@@ -30,7 +30,6 @@ const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
        [RXRPC_CALL_CLIENT_SEND_REQUEST]        = "ClSndReq",
        [RXRPC_CALL_CLIENT_AWAIT_REPLY]         = "ClAwtRpl",
        [RXRPC_CALL_CLIENT_RECV_REPLY]          = "ClRcvRpl",
-       [RXRPC_CALL_CLIENT_FINAL_ACK]           = "ClFnlACK",
        [RXRPC_CALL_SERVER_PREALLOC]            = "SvPrealc",
        [RXRPC_CALL_SERVER_SECURING]            = "SvSecure",
        [RXRPC_CALL_SERVER_ACCEPTING]           = "SvAccept",
@@ -43,7 +42,6 @@ const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
 
 const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = {
        [RXRPC_CALL_SUCCEEDED]                  = "Complete",
-       [RXRPC_CALL_SERVER_BUSY]                = "SvBusy  ",
        [RXRPC_CALL_REMOTELY_ABORTED]           = "RmtAbort",
        [RXRPC_CALL_LOCALLY_ABORTED]            = "LocAbort",
        [RXRPC_CALL_LOCAL_ERROR]                = "LocError",
@@ -57,10 +55,8 @@ const char rxrpc_call_traces[rxrpc_call__nr_trace][4] = {
        [rxrpc_call_queued_ref]         = "QUR",
        [rxrpc_call_seen]               = "SEE",
        [rxrpc_call_got]                = "GOT",
-       [rxrpc_call_got_skb]            = "Gsk",
        [rxrpc_call_got_userid]         = "Gus",
        [rxrpc_call_put]                = "PUT",
-       [rxrpc_call_put_skb]            = "Psk",
        [rxrpc_call_put_userid]         = "Pus",
        [rxrpc_call_put_noqueue]        = "PNQ",
 };
@@ -69,9 +65,15 @@ struct kmem_cache *rxrpc_call_jar;
 LIST_HEAD(rxrpc_calls);
 DEFINE_RWLOCK(rxrpc_call_lock);
 
-static void rxrpc_call_life_expired(unsigned long _call);
-static void rxrpc_ack_time_expired(unsigned long _call);
-static void rxrpc_resend_time_expired(unsigned long _call);
+static void rxrpc_call_timer_expired(unsigned long _call)
+{
+       struct rxrpc_call *call = (struct rxrpc_call *)_call;
+
+       _enter("%d", call->debug_id);
+
+       if (call->state < RXRPC_CALL_COMPLETE)
+               rxrpc_queue_call(call);
+}
 
 /*
  * find an extant server call
@@ -121,27 +123,24 @@ struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
        if (!call)
                return NULL;
 
-       call->acks_winsz = 16;
-       call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
+       call->rxtx_buffer = kcalloc(RXRPC_RXTX_BUFF_SIZE,
+                                   sizeof(struct sk_buff *),
                                    gfp);
-       if (!call->acks_window) {
-               kmem_cache_free(rxrpc_call_jar, call);
-               return NULL;
-       }
+       if (!call->rxtx_buffer)
+               goto nomem;
 
-       setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
-                   (unsigned long) call);
-       setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
-                   (unsigned long) call);
-       setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
-                   (unsigned long) call);
+       call->rxtx_annotations = kcalloc(RXRPC_RXTX_BUFF_SIZE, sizeof(u8), gfp);
+       if (!call->rxtx_annotations)
+               goto nomem_2;
+
+       setup_timer(&call->timer, rxrpc_call_timer_expired,
+                   (unsigned long)call);
        INIT_WORK(&call->processor, &rxrpc_process_call);
        INIT_LIST_HEAD(&call->link);
        INIT_LIST_HEAD(&call->chan_wait_link);
        INIT_LIST_HEAD(&call->accept_link);
-       skb_queue_head_init(&call->rx_queue);
-       skb_queue_head_init(&call->rx_oos_queue);
-       skb_queue_head_init(&call->knlrecv_queue);
+       INIT_LIST_HEAD(&call->recvmsg_link);
+       INIT_LIST_HEAD(&call->sock_link);
        init_waitqueue_head(&call->waitq);
        spin_lock_init(&call->lock);
        rwlock_init(&call->state_lock);
@@ -150,63 +149,52 @@ struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
 
        memset(&call->sock_node, 0xed, sizeof(call->sock_node));
 
-       call->rx_data_expect = 1;
-       call->rx_data_eaten = 0;
-       call->rx_first_oos = 0;
-       call->ackr_win_top = call->rx_data_eaten + 1 + rxrpc_rx_window_size;
-       call->creation_jif = jiffies;
+       /* Leave space in the ring to handle a maxed-out jumbo packet */
+       call->rx_winsize = RXRPC_RXTX_BUFF_SIZE - 1 - 46;
+       call->tx_winsize = 16;
+       call->rx_expect_next = 1;
        return call;
+
+nomem_2:
+       kfree(call->rxtx_buffer);
+nomem:
+       kmem_cache_free(rxrpc_call_jar, call);
+       return NULL;
 }
 
 /*
  * Allocate a new client call.
  */
-static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
-                                                 struct sockaddr_rxrpc *srx,
+static struct rxrpc_call *rxrpc_alloc_client_call(struct sockaddr_rxrpc *srx,
                                                  gfp_t gfp)
 {
        struct rxrpc_call *call;
 
        _enter("");
 
-       ASSERT(rx->local != NULL);
-
        call = rxrpc_alloc_call(gfp);
        if (!call)
                return ERR_PTR(-ENOMEM);
        call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
-       call->rx_data_post = 1;
        call->service_id = srx->srx_service;
-       rcu_assign_pointer(call->socket, rx);
 
        _leave(" = %p", call);
        return call;
 }
 
 /*
- * Begin client call.
+ * Initiate the call ack/resend/expiry timer.
  */
-static int rxrpc_begin_client_call(struct rxrpc_call *call,
-                                  struct rxrpc_conn_parameters *cp,
-                                  struct sockaddr_rxrpc *srx,
-                                  gfp_t gfp)
+static void rxrpc_start_call_timer(struct rxrpc_call *call)
 {
-       int ret;
-
-       /* Set up or get a connection record and set the protocol parameters,
-        * including channel number and call ID.
-        */
-       ret = rxrpc_connect_call(call, cp, srx, gfp);
-       if (ret < 0)
-               return ret;
-
-       spin_lock(&call->conn->params.peer->lock);
-       hlist_add_head(&call->error_link, &call->conn->params.peer->error_targets);
-       spin_unlock(&call->conn->params.peer->lock);
-
-       call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
-       add_timer(&call->lifetimer);
-       return 0;
+       unsigned long expire_at;
+
+       expire_at = jiffies + rxrpc_max_call_lifetime;
+       call->expire_at = expire_at;
+       call->ack_at = expire_at;
+       call->resend_at = expire_at;
+       call->timer.expires = expire_at;
+       add_timer(&call->timer);
 }
 
 /*
@@ -226,7 +214,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 
        _enter("%p,%lx", rx, user_call_ID);
 
-       call = rxrpc_alloc_client_call(rx, srx, gfp);
+       call = rxrpc_alloc_client_call(srx, gfp);
        if (IS_ERR(call)) {
                _leave(" = %ld", PTR_ERR(call));
                return call;
@@ -255,19 +243,32 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
                        goto found_user_ID_now_present;
        }
 
+       rcu_assign_pointer(call->socket, rx);
        rxrpc_get_call(call, rxrpc_call_got_userid);
        rb_link_node(&call->sock_node, parent, pp);
        rb_insert_color(&call->sock_node, &rx->calls);
+       list_add(&call->sock_link, &rx->sock_calls);
+
        write_unlock(&rx->call_lock);
 
-       write_lock_bh(&rxrpc_call_lock);
+       write_lock(&rxrpc_call_lock);
        list_add_tail(&call->link, &rxrpc_calls);
-       write_unlock_bh(&rxrpc_call_lock);
+       write_unlock(&rxrpc_call_lock);
 
-       ret = rxrpc_begin_client_call(call, cp, srx, gfp);
+       /* Set up or get a connection record and set the protocol parameters,
+        * including channel number and call ID.
+        */
+       ret = rxrpc_connect_call(call, cp, srx, gfp);
        if (ret < 0)
                goto error;
 
+       spin_lock_bh(&call->conn->params.peer->lock);
+       hlist_add_head(&call->error_link,
+                      &call->conn->params.peer->error_targets);
+       spin_unlock_bh(&call->conn->params.peer->lock);
+
+       rxrpc_start_call_timer(call);
+
        _net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
 
        _leave(" = %p [new]", call);
@@ -279,9 +280,9 @@ error:
        write_unlock(&rx->call_lock);
        rxrpc_put_call(call, rxrpc_call_put_userid);
 
-       write_lock_bh(&rxrpc_call_lock);
+       write_lock(&rxrpc_call_lock);
        list_del_init(&call->link);
-       write_unlock_bh(&rxrpc_call_lock);
+       write_unlock(&rxrpc_call_lock);
 
 error_out:
        __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
@@ -303,142 +304,46 @@ found_user_ID_now_present:
 }
 
 /*
- * set up an incoming call
- * - called in process context with IRQs enabled
+ * Set up an incoming call.  call->conn points to the connection.
+ * This is called in BH context and isn't allowed to fail.
  */
-struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
-                                      struct rxrpc_connection *conn,
-                                      struct sk_buff *skb)
+void rxrpc_incoming_call(struct rxrpc_sock *rx,
+                        struct rxrpc_call *call,
+                        struct sk_buff *skb)
 {
+       struct rxrpc_connection *conn = call->conn;
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-       struct rxrpc_call *call, *candidate;
-       const void *here = __builtin_return_address(0);
-       u32 call_id, chan;
-
-       _enter(",%d", conn->debug_id);
-
-       ASSERT(rx != NULL);
-
-       candidate = rxrpc_alloc_call(GFP_NOIO);
-       if (!candidate)
-               return ERR_PTR(-EBUSY);
+       u32 chan;
 
-       trace_rxrpc_call(candidate, rxrpc_call_new_service,
-                        atomic_read(&candidate->usage), here, NULL);
+       _enter(",%d", call->conn->debug_id);
 
-       chan = sp->hdr.cid & RXRPC_CHANNELMASK;
-       candidate->conn         = conn;
-       candidate->peer         = conn->params.peer;
-       candidate->cid          = sp->hdr.cid;
-       candidate->call_id      = sp->hdr.callNumber;
-       candidate->security_ix  = sp->hdr.securityIndex;
-       candidate->rx_data_post = 0;
-       candidate->state        = RXRPC_CALL_SERVER_ACCEPTING;
-       candidate->flags        |= (1 << RXRPC_CALL_IS_SERVICE);
-       if (conn->security_ix > 0)
-               candidate->state = RXRPC_CALL_SERVER_SECURING;
-       rcu_assign_pointer(candidate->socket, rx);
-
-       spin_lock(&conn->channel_lock);
-
-       /* set the channel for this call */
-       call = rcu_dereference_protected(conn->channels[chan].call,
-                                        lockdep_is_held(&conn->channel_lock));
-
-       _debug("channel[%u] is %p", candidate->cid & RXRPC_CHANNELMASK, call);
-       if (call && call->call_id == sp->hdr.callNumber) {
-               /* already set; must've been a duplicate packet */
-               _debug("extant call [%d]", call->state);
-               ASSERTCMP(call->conn, ==, conn);
-
-               read_lock(&call->state_lock);
-               switch (call->state) {
-               case RXRPC_CALL_LOCALLY_ABORTED:
-                       if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
-                               rxrpc_queue_call(call);
-               case RXRPC_CALL_REMOTELY_ABORTED:
-                       read_unlock(&call->state_lock);
-                       goto aborted_call;
-               default:
-                       rxrpc_get_call(call, rxrpc_call_got);
-                       read_unlock(&call->state_lock);
-                       goto extant_call;
-               }
-       }
-
-       if (call) {
-               /* it seems the channel is still in use from the previous call
-                * - ditch the old binding if its call is now complete */
-               _debug("CALL: %u { %s }",
-                      call->debug_id, rxrpc_call_states[call->state]);
-
-               if (call->state == RXRPC_CALL_COMPLETE) {
-                       __rxrpc_disconnect_call(conn, call);
-               } else {
-                       spin_unlock(&conn->channel_lock);
-                       kmem_cache_free(rxrpc_call_jar, candidate);
-                       _leave(" = -EBUSY");
-                       return ERR_PTR(-EBUSY);
-               }
-       }
-
-       /* check the call number isn't duplicate */
-       _debug("check dup");
-       call_id = sp->hdr.callNumber;
-
-       /* We just ignore calls prior to the current call ID.  Terminated calls
-        * are handled via the connection.
+       rcu_assign_pointer(call->socket, rx);
+       call->call_id           = sp->hdr.callNumber;
+       call->service_id        = sp->hdr.serviceId;
+       call->cid               = sp->hdr.cid;
+       call->state             = RXRPC_CALL_SERVER_ACCEPTING;
+       if (sp->hdr.securityIndex > 0)
+               call->state     = RXRPC_CALL_SERVER_SECURING;
+
+       /* Set the channel for this call.  We don't get channel_lock as we're
+        * only defending against the data_ready handler (which we're called
+        * from) and the RESPONSE packet parser (which is only really
+        * interested in call_counter and can cope with a disagreement with the
+        * call pointer).
         */
-       if (call_id <= conn->channels[chan].call_counter)
-               goto old_call; /* TODO: Just drop packet */
-
-       /* Temporary: Mirror the backlog prealloc ref (TODO: use prealloc) */
-       rxrpc_get_call(candidate, rxrpc_call_got);
-
-       /* make the call available */
-       _debug("new call");
-       call = candidate;
-       candidate = NULL;
-       conn->channels[chan].call_counter = call_id;
+       chan = sp->hdr.cid & RXRPC_CHANNELMASK;
+       conn->channels[chan].call_counter = call->call_id;
+       conn->channels[chan].call_id = call->call_id;
        rcu_assign_pointer(conn->channels[chan].call, call);
-       rxrpc_get_connection(conn);
-       rxrpc_get_peer(call->peer);
-       spin_unlock(&conn->channel_lock);
 
        spin_lock(&conn->params.peer->lock);
        hlist_add_head(&call->error_link, &conn->params.peer->error_targets);
        spin_unlock(&conn->params.peer->lock);
 
-       write_lock_bh(&rxrpc_call_lock);
-       list_add_tail(&call->link, &rxrpc_calls);
-       write_unlock_bh(&rxrpc_call_lock);
-
-       call->service_id = conn->params.service_id;
-
        _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
 
-       call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
-       add_timer(&call->lifetimer);
-       _leave(" = %p {%d} [new]", call, call->debug_id);
-       return call;
-
-extant_call:
-       spin_unlock(&conn->channel_lock);
-       kmem_cache_free(rxrpc_call_jar, candidate);
-       _leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
-       return call;
-
-aborted_call:
-       spin_unlock(&conn->channel_lock);
-       kmem_cache_free(rxrpc_call_jar, candidate);
-       _leave(" = -ECONNABORTED");
-       return ERR_PTR(-ECONNABORTED);
-
-old_call:
-       spin_unlock(&conn->channel_lock);
-       kmem_cache_free(rxrpc_call_jar, candidate);
-       _leave(" = -ECONNRESET [old]");
-       return ERR_PTR(-ECONNRESET);
+       rxrpc_start_call_timer(call);
+       _leave("");
 }
 
 /*
@@ -497,25 +402,17 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
 }
 
 /*
- * Note the addition of a ref on a call for a socket buffer.
+ * Detach a call from its owning socket.
  */
-void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
+void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 {
-       const void *here = __builtin_return_address(0);
-       int n = atomic_inc_return(&call->usage);
+       struct rxrpc_connection *conn = call->conn;
+       bool put = false;
+       int i;
 
-       trace_rxrpc_call(call, rxrpc_call_got_skb, n, here, skb);
-}
+       _enter("{%d,%d}", call->debug_id, atomic_read(&call->usage));
 
-/*
- * detach a call from a socket and set up for release
- */
-void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
-{
-       _enter("{%d,%d,%d,%d}",
-              call->debug_id, atomic_read(&call->usage),
-              atomic_read(&call->ackr_not_idle),
-              call->rx_first_oos);
+       ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
 
        rxrpc_see_call(call);
 
@@ -524,80 +421,46 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
                BUG();
        spin_unlock_bh(&call->lock);
 
-       /* dissociate from the socket
-        * - the socket's ref on the call is passed to the death timer
-        */
-       _debug("RELEASE CALL %p (%d)", call, call->debug_id);
+       del_timer_sync(&call->timer);
 
-       if (call->peer) {
-               spin_lock(&call->peer->lock);
-               hlist_del_init(&call->error_link);
-               spin_unlock(&call->peer->lock);
-       }
+       /* Make sure we don't get any more notifications */
+       write_lock_bh(&rx->recvmsg_lock);
 
-       write_lock_bh(&rx->call_lock);
-       if (!list_empty(&call->accept_link)) {
+       if (!list_empty(&call->recvmsg_link)) {
                _debug("unlinking once-pending call %p { e=%lx f=%lx }",
                       call, call->events, call->flags);
-               ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
-               list_del_init(&call->accept_link);
-               sk_acceptq_removed(&rx->sk);
-       } else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
+               list_del(&call->recvmsg_link);
+               put = true;
+       }
+
+       /* list_empty() must return false in rxrpc_notify_socket() */
+       call->recvmsg_link.next = NULL;
+       call->recvmsg_link.prev = NULL;
+
+       write_unlock_bh(&rx->recvmsg_lock);
+       if (put)
+               rxrpc_put_call(call, rxrpc_call_put);
+
+       write_lock(&rx->call_lock);
+
+       if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
                rb_erase(&call->sock_node, &rx->calls);
                memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
-               clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
                rxrpc_put_call(call, rxrpc_call_put_userid);
        }
-       write_unlock_bh(&rx->call_lock);
-
-       /* free up the channel for reuse */
-       if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK) {
-               clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
-               rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK);
-               rxrpc_call_completed(call);
-       } else {
-               write_lock_bh(&call->state_lock);
-
-               if (call->state < RXRPC_CALL_COMPLETE) {
-                       _debug("+++ ABORTING STATE %d +++\n", call->state);
-                       __rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, ECONNRESET);
-                       clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events);
-                       rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT);
-               }
-
-               write_unlock_bh(&call->state_lock);
-       }
 
-       if (call->conn)
+       list_del(&call->sock_link);
+       write_unlock(&rx->call_lock);
+
+       _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
+
+       if (conn)
                rxrpc_disconnect_call(call);
 
-       /* clean up the Rx queue */
-       if (!skb_queue_empty(&call->rx_queue) ||
-           !skb_queue_empty(&call->rx_oos_queue)) {
-               struct rxrpc_skb_priv *sp;
-               struct sk_buff *skb;
-
-               _debug("purge Rx queues");
-
-               spin_lock_bh(&call->lock);
-               while ((skb = skb_dequeue(&call->rx_queue)) ||
-                      (skb = skb_dequeue(&call->rx_oos_queue))) {
-                       spin_unlock_bh(&call->lock);
-
-                       sp = rxrpc_skb(skb);
-                       _debug("- zap %s %%%u #%u",
-                              rxrpc_pkts[sp->hdr.type],
-                              sp->hdr.serial, sp->hdr.seq);
-                       rxrpc_free_skb(skb);
-                       spin_lock_bh(&call->lock);
-               }
-               spin_unlock_bh(&call->lock);
+       for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
+               rxrpc_free_skb(call->rxtx_buffer[i]);
+               call->rxtx_buffer[i] = NULL;
        }
-       rxrpc_purge_queue(&call->knlrecv_queue);
-
-       del_timer_sync(&call->resend_timer);
-       del_timer_sync(&call->ack_timer);
-       del_timer_sync(&call->lifetimer);
 
        /* We have to release the prealloc backlog ref */
        if (rxrpc_is_service_call(call))
@@ -611,28 +474,19 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
 {
        struct rxrpc_call *call;
-       struct rb_node *p;
 
        _enter("%p", rx);
 
-       read_lock_bh(&rx->call_lock);
-
-       /* kill the not-yet-accepted incoming calls */
-       list_for_each_entry(call, &rx->secureq, accept_link) {
-               rxrpc_release_call(rx, call);
-       }
-
-       list_for_each_entry(call, &rx->acceptq, accept_link) {
-               rxrpc_release_call(rx, call);
-       }
-
-       /* mark all the calls as no longer wanting incoming packets */
-       for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
-               call = rb_entry(p, struct rxrpc_call, sock_node);
+       while (!list_empty(&rx->sock_calls)) {
+               call = list_entry(rx->sock_calls.next,
+                                 struct rxrpc_call, sock_link);
+               rxrpc_get_call(call, rxrpc_call_got);
+               rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, ECONNRESET);
+               rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ABORT);
                rxrpc_release_call(rx, call);
+               rxrpc_put_call(call, rxrpc_call_put);
        }
 
-       read_unlock_bh(&rx->call_lock);
        _leave("");
 }
 
@@ -651,23 +505,12 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
        ASSERTCMP(n, >=, 0);
        if (n == 0) {
                _debug("call %d dead", call->debug_id);
-               rxrpc_cleanup_call(call);
-       }
-}
+               ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
 
-/*
- * Release a call ref held by a socket buffer.
- */
-void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb)
-{
-       const void *here = __builtin_return_address(0);
-       int n;
+               write_lock(&rxrpc_call_lock);
+               list_del_init(&call->link);
+               write_unlock(&rxrpc_call_lock);
 
-       n = atomic_dec_return(&call->usage);
-       trace_rxrpc_call(call, rxrpc_call_put_skb, n, here, skb);
-       ASSERTCMP(n, >=, 0);
-       if (n == 0) {
-               _debug("call %d dead", call->debug_id);
                rxrpc_cleanup_call(call);
        }
 }
@@ -679,9 +522,9 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
 {
        struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
 
-       rxrpc_purge_queue(&call->rx_queue);
-       rxrpc_purge_queue(&call->knlrecv_queue);
        rxrpc_put_peer(call->peer);
+       kfree(call->rxtx_buffer);
+       kfree(call->rxtx_annotations);
        kmem_cache_free(rxrpc_call_jar, call);
 }
 
@@ -690,49 +533,24 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
  */
 void rxrpc_cleanup_call(struct rxrpc_call *call)
 {
-       _net("DESTROY CALL %d", call->debug_id);
+       int i;
 
-       write_lock_bh(&rxrpc_call_lock);
-       list_del_init(&call->link);
-       write_unlock_bh(&rxrpc_call_lock);
+       _net("DESTROY CALL %d", call->debug_id);
 
        memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
 
-       del_timer_sync(&call->lifetimer);
-       del_timer_sync(&call->ack_timer);
-       del_timer_sync(&call->resend_timer);
+       del_timer_sync(&call->timer);
 
        ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
        ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
-       ASSERT(!work_pending(&call->processor));
        ASSERTCMP(call->conn, ==, NULL);
 
-       if (call->acks_window) {
-               _debug("kill Tx window %d",
-                      CIRC_CNT(call->acks_head, call->acks_tail,
-                               call->acks_winsz));
-               smp_mb();
-               while (CIRC_CNT(call->acks_head, call->acks_tail,
-                               call->acks_winsz) > 0) {
-                       struct rxrpc_skb_priv *sp;
-                       unsigned long _skb;
-
-                       _skb = call->acks_window[call->acks_tail] & ~1;
-                       sp = rxrpc_skb((struct sk_buff *)_skb);
-                       _debug("+++ clear Tx %u", sp->hdr.seq);
-                       rxrpc_free_skb((struct sk_buff *)_skb);
-                       call->acks_tail =
-                               (call->acks_tail + 1) & (call->acks_winsz - 1);
-               }
-
-               kfree(call->acks_window);
-       }
+       /* Clean up the Rx/Tx buffer */
+       for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++)
+               rxrpc_free_skb(call->rxtx_buffer[i]);
 
        rxrpc_free_skb(call->tx_pending);
 
-       rxrpc_purge_queue(&call->rx_queue);
-       ASSERT(skb_queue_empty(&call->rx_oos_queue));
-       rxrpc_purge_queue(&call->knlrecv_queue);
        call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
 }
 
@@ -747,8 +565,8 @@ void __exit rxrpc_destroy_all_calls(void)
 
        if (list_empty(&rxrpc_calls))
                return;
-       
-       write_lock_bh(&rxrpc_call_lock);
+
+       write_lock(&rxrpc_call_lock);
 
        while (!list_empty(&rxrpc_calls)) {
                call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
@@ -757,74 +575,15 @@ void __exit rxrpc_destroy_all_calls(void)
                rxrpc_see_call(call);
                list_del_init(&call->link);
 
-               pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
+               pr_err("Call %p still in use (%d,%s,%lx,%lx)!\n",
                       call, atomic_read(&call->usage),
-                      atomic_read(&call->ackr_not_idle),
                       rxrpc_call_states[call->state],
                       call->flags, call->events);
-               if (!skb_queue_empty(&call->rx_queue))
-                       pr_err("Rx queue occupied\n");
-               if (!skb_queue_empty(&call->rx_oos_queue))
-                       pr_err("OOS queue occupied\n");
 
-               write_unlock_bh(&rxrpc_call_lock);
+               write_unlock(&rxrpc_call_lock);
                cond_resched();
-               write_lock_bh(&rxrpc_call_lock);
+               write_lock(&rxrpc_call_lock);
        }
 
-       write_unlock_bh(&rxrpc_call_lock);
-       _leave("");
-}
-
-/*
- * handle call lifetime being exceeded
- */
-static void rxrpc_call_life_expired(unsigned long _call)
-{
-       struct rxrpc_call *call = (struct rxrpc_call *) _call;
-
-       _enter("{%d}", call->debug_id);
-
-       rxrpc_see_call(call);
-       if (call->state >= RXRPC_CALL_COMPLETE)
-               return;
-
-       set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events);
-       rxrpc_queue_call(call);
-}
-
-/*
- * handle resend timer expiry
- * - may not take call->state_lock as this can deadlock against del_timer_sync()
- */
-static void rxrpc_resend_time_expired(unsigned long _call)
-{
-       struct rxrpc_call *call = (struct rxrpc_call *) _call;
-
-       _enter("{%d}", call->debug_id);
-
-       rxrpc_see_call(call);
-       if (call->state >= RXRPC_CALL_COMPLETE)
-               return;
-
-       clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
-       if (!test_and_set_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events))
-               rxrpc_queue_call(call);
-}
-
-/*
- * handle ACK timer expiry
- */
-static void rxrpc_ack_time_expired(unsigned long _call)
-{
-       struct rxrpc_call *call = (struct rxrpc_call *) _call;
-
-       _enter("{%d}", call->debug_id);
-
-       rxrpc_see_call(call);
-       if (call->state >= RXRPC_CALL_COMPLETE)
-               return;
-
-       if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
-               rxrpc_queue_call(call);
+       write_unlock(&rxrpc_call_lock);
 }