summaryrefslogtreecommitdiff
path: root/net/rxrpc/io_thread.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2024-12-04 07:46:41 +0000
committerJakub Kicinski <kuba@kernel.org>2024-12-09 13:48:26 -0800
commit9e3cccd176b5ec6ff78693287fb03097e453e69c (patch)
treeb68272839cee48dca39bbfeaee3f4ea0f60065a6 /net/rxrpc/io_thread.c
parent149d002bee706f51772bd320cda90c922844bb0e (diff)
rxrpc: Fix CPU time starvation in I/O thread
Starvation can happen in the rxrpc I/O thread because it goes back to the top of the I/O loop after it does any one thing without trying to give any other connection or call CPU time. Also, because it processes one call packet at a time, it tries to do the retransmission loop after each ACK without checking to see if there are other ACKs already in the queue that can update the SACK state. Fix this by: (1) Add a received-packet queue on each call. (2) Distribute packets from the master Rx queue to the individual call, conn and error queues and 'poking' calls to add them to the attend queue first thing in the I/O thread. (3) Go through all the attention-seeking connections and calls before going back to the top of the I/O thread. Each queue is extracted as a whole and then gone through so that new additions to insert themselves into the queue. (4) Make the call event handler go through all the packets currently on the call's rx_queue before transmitting and retransmitting DATA packets. (5) Drop the skb argument from the call event handler as this is now replaced with the rx_queue. Instead, keep track of whether we received a packet or an ACK for the tests that used to rely on that. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org Link: https://patch.msgid.link/20241204074710.990092-14-dhowells@redhat.com Signed-off-by: Jakub Kicinski <kuba@kernel.org>
Diffstat (limited to 'net/rxrpc/io_thread.c')
-rw-r--r--net/rxrpc/io_thread.c104
1 files changed, 53 insertions, 51 deletions
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index bd6d4f5e97b4..bc678a299bd8 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -338,7 +338,6 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
struct rxrpc_channel *chan;
struct rxrpc_call *call = NULL;
unsigned int channel;
- bool ret;
if (sp->hdr.securityIndex != conn->security_ix)
return rxrpc_direct_abort(skb, rxrpc_eproto_wrong_security,
@@ -425,9 +424,9 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
peer_srx, skb);
}
- ret = rxrpc_input_call_event(call, skb);
+ rxrpc_queue_rx_call_packet(call, skb);
rxrpc_put_call(call, rxrpc_call_put_input);
- return ret;
+ return true;
}
/*
@@ -444,6 +443,8 @@ int rxrpc_io_thread(void *data)
ktime_t now;
#endif
bool should_stop;
+ LIST_HEAD(conn_attend_q);
+ LIST_HEAD(call_attend_q);
complete(&local->io_thread_ready);
@@ -454,43 +455,25 @@ int rxrpc_io_thread(void *data)
for (;;) {
rxrpc_inc_stat(local->rxnet, stat_io_loop);
- /* Deal with connections that want immediate attention. */
- conn = list_first_entry_or_null(&local->conn_attend_q,
- struct rxrpc_connection,
- attend_link);
- if (conn) {
- spin_lock_bh(&local->lock);
- list_del_init(&conn->attend_link);
- spin_unlock_bh(&local->lock);
-
- rxrpc_input_conn_event(conn, NULL);
- rxrpc_put_connection(conn, rxrpc_conn_put_poke);
- continue;
+ /* Inject a delay into packets if requested. */
+#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
+ now = ktime_get_real();
+ while ((skb = skb_peek(&local->rx_delay_queue))) {
+ if (ktime_before(now, skb->tstamp))
+ break;
+ skb = skb_dequeue(&local->rx_delay_queue);
+ skb_queue_tail(&local->rx_queue, skb);
}
+#endif
- if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
- &local->client_conn_flags))
- rxrpc_discard_expired_client_conns(local);
-
- /* Deal with calls that want immediate attention. */
- if ((call = list_first_entry_or_null(&local->call_attend_q,
- struct rxrpc_call,
- attend_link))) {
- spin_lock_bh(&local->lock);
- list_del_init(&call->attend_link);
- spin_unlock_bh(&local->lock);
-
- trace_rxrpc_call_poked(call);
- rxrpc_input_call_event(call, NULL);
- rxrpc_put_call(call, rxrpc_call_put_poke);
- continue;
+ if (!skb_queue_empty(&local->rx_queue)) {
+ spin_lock_irq(&local->rx_queue.lock);
+ skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
+ spin_unlock_irq(&local->rx_queue.lock);
}
- if (!list_empty(&local->new_client_calls))
- rxrpc_connect_client_calls(local);
-
- /* Process received packets and errors. */
- if ((skb = __skb_dequeue(&rx_queue))) {
+ /* Distribute packets and errors. */
+ while ((skb = __skb_dequeue(&rx_queue))) {
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
switch (skb->mark) {
case RXRPC_SKB_MARK_PACKET:
@@ -514,27 +497,46 @@ int rxrpc_io_thread(void *data)
rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
break;
}
- continue;
}
- /* Inject a delay into packets if requested. */
-#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
- now = ktime_get_real();
- while ((skb = skb_peek(&local->rx_delay_queue))) {
- if (ktime_before(now, skb->tstamp))
- break;
- skb = skb_dequeue(&local->rx_delay_queue);
- skb_queue_tail(&local->rx_queue, skb);
+ /* Deal with connections that want immediate attention. */
+ spin_lock_bh(&local->lock);
+ list_splice_tail_init(&local->conn_attend_q, &conn_attend_q);
+ spin_unlock_bh(&local->lock);
+
+ while ((conn = list_first_entry_or_null(&conn_attend_q,
+ struct rxrpc_connection,
+ attend_link))) {
+ spin_lock_bh(&local->lock);
+ list_del_init(&conn->attend_link);
+ spin_unlock_bh(&local->lock);
+ rxrpc_input_conn_event(conn, NULL);
+ rxrpc_put_connection(conn, rxrpc_conn_put_poke);
}
-#endif
- if (!skb_queue_empty(&local->rx_queue)) {
- spin_lock_irq(&local->rx_queue.lock);
- skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
- spin_unlock_irq(&local->rx_queue.lock);
- continue;
+ if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
+ &local->client_conn_flags))
+ rxrpc_discard_expired_client_conns(local);
+
+ /* Deal with calls that want immediate attention. */
+ spin_lock_bh(&local->lock);
+ list_splice_tail_init(&local->call_attend_q, &call_attend_q);
+ spin_unlock_bh(&local->lock);
+
+ while ((call = list_first_entry_or_null(&call_attend_q,
+ struct rxrpc_call,
+ attend_link))) {
+ spin_lock_bh(&local->lock);
+ list_del_init(&call->attend_link);
+ spin_unlock_bh(&local->lock);
+ trace_rxrpc_call_poked(call);
+ rxrpc_input_call_event(call);
+ rxrpc_put_call(call, rxrpc_call_put_poke);
}
+ if (!list_empty(&local->new_client_calls))
+ rxrpc_connect_client_calls(local);
+
set_current_state(TASK_INTERRUPTIBLE);
should_stop = kthread_should_stop();
if (!skb_queue_empty(&local->rx_queue) ||