Skip to content

Commit 7c5b420

Browse files
Jon Maloydavem330
authored andcommitted
tipc: reduce risk of wakeup queue starvation
In commit 365ad35 ("tipc: reduce risk of user starvation during link congestion") we allowed senders to add exactly one list of extra buffers to the link backlog queues during link congestion (aka "oversubscription"). However, the criteria for when to stop adding wakeup messages to the input queue when the overload abates is inaccurate, and may cause starvation problems during very high load. Currently, we stop adding wakeup messages after 10 total failed attempts where we find that there is no space left in the backlog queue for a certain importance level. The counter for this is accumulated across all levels, which may lead the algorithm to leave the loop prematurely, although there may still be plenty of space available at some levels. The result is sometimes that messages near the wakeup queue tail are not added to the input queue as they should be. We now introduce a more exact algorithm, where we keep adding wakeup messages to a level as long as the backlog queue has free slots for the corresponding level, and stop at the moment there are no more such slots or when there are no more wakeup messages to dequeue. Fixes: 365ad35 ("tipc: reduce risk of user starvation during link congestion") Reported-by: Tung Nguyen <[email protected]> Acked-by: Ying Xue <[email protected]> Signed-off-by: Jon Maloy <[email protected]> Signed-off-by: David S. Miller <[email protected]>
1 parent f7571cd commit 7c5b420

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

net/tipc/link.c

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -854,18 +854,31 @@ static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
854854
*/
855855
static void link_prepare_wakeup(struct tipc_link *l)
856856
{
857+
struct sk_buff_head *wakeupq = &l->wakeupq;
858+
struct sk_buff_head *inputq = l->inputq;
857859
struct sk_buff *skb, *tmp;
858-
int imp, i = 0;
860+
struct sk_buff_head tmpq;
861+
int avail[5] = {0,};
862+
int imp = 0;
863+
864+
__skb_queue_head_init(&tmpq);
859865

860-
skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
866+
for (; imp <= TIPC_SYSTEM_IMPORTANCE; imp++)
867+
avail[imp] = l->backlog[imp].limit - l->backlog[imp].len;
868+
869+
skb_queue_walk_safe(wakeupq, skb, tmp) {
861870
imp = TIPC_SKB_CB(skb)->chain_imp;
862-
if (l->backlog[imp].len < l->backlog[imp].limit) {
863-
skb_unlink(skb, &l->wakeupq);
864-
skb_queue_tail(l->inputq, skb);
865-
} else if (i++ > 10) {
866-
break;
867-
}
871+
if (avail[imp] <= 0)
872+
continue;
873+
avail[imp]--;
874+
__skb_unlink(skb, wakeupq);
875+
__skb_queue_tail(&tmpq, skb);
868876
}
877+
878+
spin_lock_bh(&inputq->lock);
879+
skb_queue_splice_tail(&tmpq, inputq);
880+
spin_unlock_bh(&inputq->lock);
881+
869882
}
870883

871884
void tipc_link_reset(struct tipc_link *l)

0 commit comments

Comments
 (0)