Skip to content

Commit a5a9fea

Browse files
authored
whisper: fix whisper go routine leak with sync wait group (#20844)
1 parent f0be151 commit a5a9fea

File tree

2 files changed

+18
-0
lines changed

2 files changed

+18
-0
lines changed

whisper/whisperv6/peer.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ type Peer struct {
4444
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
4545

4646
quit chan struct{}
47+
48+
wg sync.WaitGroup
4749
}
4850

4951
// newPeer creates a new whisper peer object, but does not run the handshake itself.
@@ -64,13 +66,15 @@ func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
6466
// start initiates the peer updater, periodically broadcasting the whisper packets
6567
// into the network.
6668
func (peer *Peer) start() {
69+
peer.wg.Add(1)
6770
go peer.update()
6871
log.Trace("start", "peer", peer.ID())
6972
}
7073

7174
// stop terminates the peer updater, stopping message forwarding to it.
7275
func (peer *Peer) stop() {
7376
close(peer.quit)
77+
peer.wg.Wait()
7478
log.Trace("stop", "peer", peer.ID())
7579
}
7680

@@ -81,7 +85,9 @@ func (peer *Peer) handshake() error {
8185
errc := make(chan error, 1)
8286
isLightNode := peer.host.LightClientMode()
8387
isRestrictedLightNodeConnection := peer.host.LightClientModeConnectionRestricted()
88+
peer.wg.Add(1)
8489
go func() {
90+
defer peer.wg.Done()
8591
pow := peer.host.MinPow()
8692
powConverted := math.Float64bits(pow)
8793
bloom := peer.host.BloomFilter()
@@ -144,6 +150,7 @@ func (peer *Peer) handshake() error {
144150
// update executes periodic operations on the peer, including message transmission
145151
// and expiration.
146152
func (peer *Peer) update() {
153+
defer peer.wg.Done()
147154
// Start the tickers for the updates
148155
expire := time.NewTicker(expirationCycle)
149156
defer expire.Stop()

whisper/whisperv6/whisper.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ type Whisper struct {
8888
stats Statistics // Statistics of whisper node
8989

9090
mailServer MailServer // MailServer interface
91+
92+
wg sync.WaitGroup
9193
}
9294

9395
// New creates a Whisper client ready to communicate through the Ethereum P2P network.
@@ -243,8 +245,10 @@ func (whisper *Whisper) SetBloomFilter(bloom []byte) error {
243245
whisper.settings.Store(bloomFilterIdx, b)
244246
whisper.notifyPeersAboutBloomFilterChange(b)
245247

248+
whisper.wg.Add(1)
246249
go func() {
247250
// allow some time before all the peers have processed the notification
251+
defer whisper.wg.Done()
248252
time.Sleep(time.Duration(whisper.syncAllowance) * time.Second)
249253
whisper.settings.Store(bloomFilterToleranceIdx, b)
250254
}()
@@ -261,7 +265,9 @@ func (whisper *Whisper) SetMinimumPoW(val float64) error {
261265
whisper.settings.Store(minPowIdx, val)
262266
whisper.notifyPeersAboutPowRequirementChange(val)
263267

268+
whisper.wg.Add(1)
264269
go func() {
270+
defer whisper.wg.Done()
265271
// allow some time before all the peers have processed the notification
266272
time.Sleep(time.Duration(whisper.syncAllowance) * time.Second)
267273
whisper.settings.Store(minPowToleranceIdx, val)
@@ -626,10 +632,12 @@ func (whisper *Whisper) Send(envelope *Envelope) error {
626632
// of the Whisper protocol.
627633
func (whisper *Whisper) Start(*p2p.Server) error {
628634
log.Info("started whisper v." + ProtocolVersionStr)
635+
whisper.wg.Add(1)
629636
go whisper.update()
630637

631638
numCPU := runtime.NumCPU()
632639
for i := 0; i < numCPU; i++ {
640+
whisper.wg.Add(1)
633641
go whisper.processQueue()
634642
}
635643

@@ -640,6 +648,7 @@ func (whisper *Whisper) Start(*p2p.Server) error {
640648
// of the Whisper protocol.
641649
func (whisper *Whisper) Stop() error {
642650
close(whisper.quit)
651+
whisper.wg.Wait()
643652
log.Info("whisper stopped")
644653
return nil
645654
}
@@ -874,6 +883,7 @@ func (whisper *Whisper) checkOverflow() {
874883

875884
// processQueue delivers the messages to the watchers during the lifetime of the whisper node.
876885
func (whisper *Whisper) processQueue() {
886+
defer whisper.wg.Done()
877887
var e *Envelope
878888
for {
879889
select {
@@ -892,6 +902,7 @@ func (whisper *Whisper) processQueue() {
892902
// update loops until the lifetime of the whisper node, updating its internal
893903
// state by expiring stale messages from the pool.
894904
func (whisper *Whisper) update() {
905+
defer whisper.wg.Done()
895906
// Start a ticker to check for expirations
896907
expire := time.NewTicker(expirationCycle)
897908

0 commit comments

Comments
 (0)